Skip to content

Commit 090cb8f

Browse files
committed
audioミキサーの新しいMediaProcessor実装とInputStream構造体を追加
1 parent c67cfdd commit 090cb8f

File tree

1 file changed

+181
-0
lines changed

1 file changed

+181
-0
lines changed

src/mixer_audio.rs

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ use crate::{
1111
},
1212
channel::{self, ErrorFlag},
1313
layout::Layout,
14+
media::{MediaSample, MediaStreamId},
1415
metadata::SourceId,
16+
processor::{MediaProcessor, MediaProcessorInput, MediaProcessorOutput, MediaProcessorSpec},
1517
stats::{AudioMixerStats, ProcessorStats, Seconds, SharedStats},
1618
};
1719

@@ -216,3 +218,182 @@ impl AudioMixerThread {
216218
})
217219
}
218220
}
221+
222+
#[derive(Debug, Default)]
223+
struct InputStream {
224+
eos: bool,
225+
sample_queue: VecDeque<(i16, i16)>,
226+
start_timestamp: Option<Duration>,
227+
}
228+
229+
#[derive(Debug)]
230+
pub struct AudioMixer {
231+
layout: Layout,
232+
input_streams: HashMap<MediaStreamId, InputStream>,
233+
output_stream_id: MediaStreamId,
234+
stats: AudioMixerStats,
235+
}
236+
237+
impl AudioMixer {
238+
pub fn new(
239+
layout: Layout,
240+
input_stream_ids: Vec<MediaStreamId>,
241+
output_stream_id: MediaStreamId,
242+
) -> Self {
243+
Self {
244+
layout,
245+
input_streams: input_stream_ids
246+
.into_iter()
247+
.map(|id| (id, InputStream::default()))
248+
.collect(),
249+
output_stream_id,
250+
stats: AudioMixerStats::default(),
251+
}
252+
}
253+
254+
/*
255+
fn next_input_timestamp(&self) -> Duration {
256+
Duration::from_secs(
257+
self.stats.total_output_sample_count.get()
258+
+ self.stats.total_trimmed_sample_count.get(),
259+
) / SAMPLE_RATE as u32
260+
}
261+
262+
fn next_output_timestamp(&self) -> Duration {
263+
Duration::from_secs(self.stats.total_output_sample_count.get()) / SAMPLE_RATE as u32
264+
}
265+
266+
fn fill_input_queue(
267+
&mut self,
268+
now: Duration,
269+
input_stream_id: MediaStreamId,
270+
) -> orfail::Result<()> {
271+
let input_stream = &mut self.input_streams[&input_stream_id];
272+
while let Some(data) = input_rx.peek() {
273+
if !self.input_sample_queues.contains_key(source_id) && now < data.timestamp {
274+
// まだ再生時刻に達していない
275+
break;
276+
} else if !self.input_sample_queues.contains_key(source_id) {
277+
// 再生時刻に達した
278+
//
279+
// 以後は、データのタイムスタンプにギャップがあったとしても
280+
// 連続しているものとして扱う
281+
// (Chrome を含む多くのブラウザがこの挙動なのと、
282+
// ギャップ部分のハンドリングは Sora 側の責務であるため。
283+
// 下手に Hisui 側でハンドリングしてしまうと、ギャップが
284+
// 極端に大きいためにあえて Sora がそのまま放置した区間を
285+
// 埋めようとしてディスクやメモリを食いつぶしてしまう恐れがある)
286+
self.input_sample_queues
287+
.insert(source_id.clone(), VecDeque::new());
288+
}
289+
290+
// サンプルキューに要素を追加する
291+
//
292+
// 想定外の入力が来ていないかを念のためにチェックする
293+
// (format と stereo については stereo_samples() の中でチェックしている)
294+
(data.sample_rate == SAMPLE_RATE).or_fail()?;
295+
296+
let queue = self.input_sample_queues.get_mut(source_id).or_fail()?;
297+
queue.extend(data.stereo_samples().or_fail()?);
298+
299+
// 処理した要素を取りだす
300+
let _ = input_rx.recv();
301+
self.stats.total_input_audio_data_count.add(1);
302+
303+
if queue.len() >= MIXED_AUDIO_DATA_SAMPLES {
304+
// 次の合成処理に必要な分のサンプルは溜った
305+
break;
306+
}
307+
}
308+
Ok(())
309+
}
310+
311+
fn next_data(&mut self) -> orfail::Result<Option<AudioData>> {
312+
let mut now = self.next_input_timestamp();
313+
while self.layout.is_in_trim_span(now) {
314+
self.stats
315+
.total_trimmed_sample_count
316+
.add(MIXED_AUDIO_DATA_SAMPLES as u64);
317+
now = self.next_input_timestamp();
318+
}
319+
320+
for mut input_rx in std::mem::take(&mut self.input_rxs) {
321+
self.fill_input_queue(now, &mut input_rx).or_fail()?;
322+
if input_rx.peek().is_some() {
323+
self.input_rxs.push(input_rx);
324+
}
325+
}
326+
327+
if self.is_eos() {
328+
// 全部のソースが終端に達した
329+
return Ok(None);
330+
}
331+
332+
let (result, elapsed) = Seconds::elapsed(|| self.mix_next_audio_data().or_fail().map(Some));
333+
self.stats.total_processing_seconds.add(elapsed);
334+
result
335+
}
336+
337+
fn is_eos(&self) -> bool {
338+
self.input_rxs.is_empty()
339+
&& self
340+
.input_sample_queues
341+
.values()
342+
.all(|queue| queue.is_empty())
343+
}
344+
345+
fn mix_next_audio_data(&mut self) -> orfail::Result<AudioData> {
346+
let timestamp = self.next_output_timestamp();
347+
348+
let bytes_per_sample = CHANNELS as usize * 2; // i16 で表現するので *2
349+
let mut mixed_samples = Vec::with_capacity(MIXED_AUDIO_DATA_SAMPLES * bytes_per_sample);
350+
351+
let mut filled = true; // 無音補完されたかどうか
352+
for _ in 0..MIXED_AUDIO_DATA_SAMPLES {
353+
let mut acc_left = 0;
354+
let mut acc_right = 0;
355+
for queue in self.input_sample_queues.values_mut() {
356+
let Some((left, right)) = queue.pop_front() else {
357+
continue;
358+
};
359+
acc_left += left as i32;
360+
acc_right += right as i32;
361+
filled = false;
362+
}
363+
364+
let left = acc_left.clamp(i16::MIN as i32, i16::MAX as i32) as i16;
365+
let right = acc_right.clamp(i16::MIN as i32, i16::MAX as i32) as i16;
366+
367+
mixed_samples.extend_from_slice(&left.to_be_bytes());
368+
mixed_samples.extend_from_slice(&right.to_be_bytes());
369+
}
370+
371+
self.stats.total_output_audio_data_count.add(1);
372+
self.stats
373+
.total_output_audio_data_seconds
374+
.add(Seconds::new(MIXED_AUDIO_DATA_DURATION));
375+
self.stats
376+
.total_output_sample_count
377+
.add(MIXED_AUDIO_DATA_SAMPLES as u64);
378+
if filled {
379+
self.stats
380+
.total_output_filled_sample_count
381+
.add(MIXED_AUDIO_DATA_SAMPLES as u64);
382+
}
383+
384+
Ok(AudioData {
385+
// 以下は固定値
386+
source_id: None, // 合成後は常に None になる
387+
format: AudioFormat::I16Be,
388+
stereo: true, // Hisui では音声は常にステレオとして扱う
389+
sample_rate: SAMPLE_RATE,
390+
duration: MIXED_AUDIO_DATA_DURATION,
391+
sample_entry: None, // 生データにはサンプルエントリーはない
392+
393+
// 以下は合成結果に応じた値
394+
data: mixed_samples,
395+
timestamp,
396+
})
397+
}
398+
*/
399+
}

0 commit comments

Comments
 (0)