Skip to content

Commit c67cfdd

Browse files
authored
Merge pull request #50 from shiguredo/feature/processor-trait-writer
ライターに MediaProcessor トレイトを実装する
2 parents f3e73a0 + 03e7e06 commit c67cfdd

File tree

3 files changed

+241
-94
lines changed

3 files changed

+241
-94
lines changed

src/composer.rs

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ use crate::{
99
decoder::VideoDecoderOptions,
1010
encoder::{AudioEncoder, AudioEncoderThread, VideoEncoder, VideoEncoderThread},
1111
layout::Layout,
12-
media::MediaStreamIdGenerator,
12+
media::{MediaSample, MediaStreamId, MediaStreamIdGenerator},
1313
mixer_audio::AudioMixerThread,
1414
mixer_video::VideoMixerThread,
15+
processor::{MediaProcessor, MediaProcessorInput, MediaProcessorOutput},
1516
source::{AudioSourceThread, VideoSourceThread},
1617
stats::{ProcessorStats, Seconds, SharedStats},
1718
types::CodecName,
@@ -70,25 +71,54 @@ impl Composer {
7071
);
7172

7273
// 映像ミキサーとエンコーダーを準備
73-
let encoded_video_rx = self
74+
let mut encoded_video_rx = self
7475
.create_video_mixer_and_encoder(error_flag.clone(), stats.clone(), video_source_rxs)
7576
.or_fail()?;
7677

7778
// 音声ミキサーとエンコーダーを準備
78-
let encoded_audio_rx = self
79+
let mut encoded_audio_rx = self
7980
.create_audio_mixer_and_encoder(error_flag.clone(), stats.clone(), audio_source_rxs)
8081
.or_fail()?;
8182

8283
// 合成後の映像と音声への MP4 への書き出しを行う(この処理は現在のスレッドで行う)
84+
let writer_input_audio_stream_id = MediaStreamId::new(1000); // audio / video で値が異なっていればなんでもいい
85+
let writer_input_video_stream_id = MediaStreamId::new(1001);
86+
8387
let mut mp4_writer = Mp4Writer::new(
8488
out_file_path,
8589
&self.layout,
86-
encoded_audio_rx,
87-
encoded_video_rx,
90+
self.layout
91+
.has_audio()
92+
.then_some(writer_input_audio_stream_id),
93+
self.layout
94+
.has_video()
95+
.then_some(writer_input_video_stream_id),
8896
)
8997
.or_fail()?;
9098

91-
while let Some(timestamp) = mp4_writer.poll().or_fail()? {
99+
loop {
100+
match mp4_writer.process_output().or_fail()? {
101+
MediaProcessorOutput::Finished => break,
102+
MediaProcessorOutput::Pending { awaiting_stream_id }
103+
if awaiting_stream_id == writer_input_audio_stream_id =>
104+
{
105+
let input = MediaProcessorInput {
106+
stream_id: awaiting_stream_id,
107+
sample: encoded_audio_rx.recv().map(MediaSample::audio_data),
108+
};
109+
mp4_writer.process_input(input).or_fail()?;
110+
}
111+
MediaProcessorOutput::Pending { awaiting_stream_id } => {
112+
let input = MediaProcessorInput {
113+
stream_id: awaiting_stream_id,
114+
sample: encoded_video_rx.recv().map(MediaSample::video_frame),
115+
};
116+
mp4_writer.process_input(input).or_fail()?;
117+
}
118+
MediaProcessorOutput::Processed { .. } => unreachable!(),
119+
}
120+
121+
let timestamp = mp4_writer.current_duration();
92122
progress_bar.set_position(timestamp.as_secs());
93123
if error_flag.get() {
94124
// ファイル読み込み、デコード、合成、エンコード、のいずれかで失敗したものがあるとここに来る

src/writer_mp4.rs

Lines changed: 95 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use std::{
2+
collections::VecDeque,
23
fs::File,
34
io::{BufWriter, Seek, SeekFrom, Write},
45
num::NonZeroU32,
56
path::Path,
7+
sync::Arc,
68
time::{Duration, SystemTime},
79
};
810

@@ -17,11 +19,13 @@ use shiguredo_mp4::{
1719
};
1820

1921
use crate::{
20-
audio::{AudioData, AudioDataReceiver},
22+
audio::AudioData,
2123
layout::{Layout, Resolution},
24+
media::{MediaSample, MediaStreamId},
2225
mixer_audio::MIXED_AUDIO_DATA_DURATION,
23-
stats::{Mp4WriterStats, Seconds},
24-
video::{VideoFrame, VideoFrameReceiver},
26+
processor::{MediaProcessor, MediaProcessorInput, MediaProcessorOutput, MediaProcessorSpec},
27+
stats::{Mp4WriterStats, ProcessorStats, Seconds},
28+
video::VideoFrame,
2529
};
2630

2731
// Hisui では出力 MP4 のタイムスケールはマイクロ秒固定にする
@@ -42,8 +46,10 @@ pub struct Mp4Writer {
4246
video_chunks: Vec<Chunk>,
4347
audio_sample_entry: Option<SampleEntry>,
4448
video_sample_entry: Option<SampleEntry>,
45-
input_audio_rx: Option<AudioDataReceiver>,
46-
input_video_rx: Option<VideoFrameReceiver>,
49+
input_audio_stream_id: Option<MediaStreamId>,
50+
input_video_stream_id: Option<MediaStreamId>,
51+
input_audio_queue: VecDeque<Arc<AudioData>>,
52+
input_video_queue: VecDeque<Arc<VideoFrame>>,
4753
finalize_time: Mp4FileTime,
4854
appending_video_chunk: bool,
4955
stats: Mp4WriterStats,
@@ -54,8 +60,8 @@ impl Mp4Writer {
5460
pub fn new<P: AsRef<Path>>(
5561
path: P,
5662
layout: &Layout,
57-
input_audio_rx: AudioDataReceiver,
58-
input_video_rx: VideoFrameReceiver,
63+
input_audio_stream_id: Option<MediaStreamId>,
64+
input_video_stream_id: Option<MediaStreamId>,
5965
) -> orfail::Result<Self> {
6066
let file = std::fs::OpenOptions::new()
6167
.create(true)
@@ -74,8 +80,10 @@ impl Mp4Writer {
7480
audio_sample_entry: None,
7581
video_sample_entry: None,
7682
finalize_time: Mp4FileTime::from_unix_time(Duration::ZERO),
77-
input_audio_rx: layout.has_audio().then_some(input_audio_rx),
78-
input_video_rx: layout.has_video().then_some(input_video_rx),
83+
input_audio_stream_id,
84+
input_video_stream_id,
85+
input_audio_queue: VecDeque::new(),
86+
input_video_queue: VecDeque::new(),
7987
appending_video_chunk: true,
8088
stats: Mp4WriterStats::default(),
8189
};
@@ -92,33 +100,16 @@ impl Mp4Writer {
92100
&self.stats
93101
}
94102

95-
/// 新しい入力(合成後の映像と音声)を待機して、それの出力ファイルへの書き込みを行う
96-
///
97-
/// 結果は現在の書き込み位置を示すタイムスタンプで、全ての書き込みが完了した場合には `Ok(None)` が返される。
98-
pub fn poll(&mut self) -> orfail::Result<Option<Duration>> {
99-
let (audio, video) = self.peek_input_audio_and_video();
100-
let audio_timestamp = audio.map(|x| x.timestamp);
101-
let video_timestamp = video.map(|x| x.timestamp);
102-
103-
let (result, elapsed) = Seconds::elapsed(|| {
104-
self.handle_next_audio_and_video(audio_timestamp, video_timestamp)
105-
.or_fail()
106-
});
107-
self.stats.total_processing_seconds.add(elapsed);
108-
109-
result
110-
}
111-
112103
fn handle_next_audio_and_video(
113104
&mut self,
114105
audio_timestamp: Option<Duration>,
115106
video_timestamp: Option<Duration>,
116-
) -> orfail::Result<Option<Duration>> {
117-
match (audio_timestamp,video_timestamp){
107+
) -> orfail::Result<bool> {
108+
match (audio_timestamp, video_timestamp){
118109
(None, None) => {
119110
// 全部の入力の処理が完了した
120111
self.finalize().or_fail()?;
121-
return Ok(None);
112+
return Ok(false);
122113
}
123114
(None, Some(_)) => {
124115
// 残りは映像のみ
@@ -149,30 +140,19 @@ impl Mp4Writer {
149140
}
150141
}
151142

152-
// 進捗(現在のタイムスタンプ)を呼び出し元に返す
153-
Ok(Some(self.current_duration()))
143+
Ok(true)
154144
}
155145

156-
fn current_duration(&self) -> Duration {
146+
pub fn current_duration(&self) -> Duration {
157147
self.stats
158148
.total_audio_track_seconds
159149
.get_duration()
160150
.max(self.stats.total_video_track_seconds.get_duration())
161151
}
162152

163-
fn peek_input_audio_and_video(&mut self) -> (Option<&AudioData>, Option<&VideoFrame>) {
164-
let audio = self.input_audio_rx.as_mut().and_then(|rx| rx.peek());
165-
let video = self.input_video_rx.as_mut().and_then(|rx| rx.peek());
166-
(audio, video)
167-
}
168-
169153
fn append_video_frame(&mut self, new_chunk: bool) -> orfail::Result<()> {
170154
// 次の入力を取り出す(これは常に成功する)
171-
let frame = self
172-
.input_video_rx
173-
.as_mut()
174-
.and_then(|rx| rx.recv())
175-
.or_fail()?;
155+
let frame = self.input_video_queue.pop_front().or_fail()?;
176156

177157
if self.stats.video_codec.get().is_none()
178158
&& let Some(name) = frame.format.codec_name()
@@ -184,7 +164,7 @@ impl Mp4Writer {
184164
// サンプルエントリーは最初に一回だけ存在する
185165
if self.video_sample_entry.is_none() {
186166
frame.sample_entry.is_some().or_fail()?;
187-
self.video_sample_entry = frame.sample_entry;
167+
self.video_sample_entry = frame.sample_entry.clone();
188168
} else {
189169
frame.sample_entry.is_none().or_fail()?;
190170
}
@@ -223,11 +203,7 @@ impl Mp4Writer {
223203

224204
fn append_audio_data(&mut self, new_chunk: bool) -> orfail::Result<()> {
225205
// 次の入力を取り出す(これは常に成功する)
226-
let data = self
227-
.input_audio_rx
228-
.as_mut()
229-
.and_then(|rx| rx.recv())
230-
.or_fail()?;
206+
let data = self.input_audio_queue.pop_front().or_fail()?;
231207

232208
if self.stats.audio_codec.get().is_none()
233209
&& let Some(name) = data.format.codec_name()
@@ -239,7 +215,7 @@ impl Mp4Writer {
239215
// サンプルエントリーは最初に一回だけ存在する
240216
if self.audio_sample_entry.is_none() {
241217
data.sample_entry.is_some().or_fail()?;
242-
self.audio_sample_entry = data.sample_entry;
218+
self.audio_sample_entry = data.sample_entry.clone();
243219
} else {
244220
data.sample_entry.is_none().or_fail()?;
245221
}
@@ -743,6 +719,75 @@ impl Mp4Writer {
743719
}
744720
}
745721

722+
impl MediaProcessor for Mp4Writer {
723+
fn spec(&self) -> MediaProcessorSpec {
724+
MediaProcessorSpec {
725+
input_stream_ids: self
726+
.input_audio_stream_id
727+
.into_iter()
728+
.chain(self.input_video_stream_id)
729+
.collect(),
730+
output_stream_ids: Vec::new(),
731+
stats: ProcessorStats::Mp4Writer(self.stats.clone()),
732+
}
733+
}
734+
735+
fn process_input(&mut self, input: MediaProcessorInput) -> orfail::Result<()> {
736+
match input.sample {
737+
Some(MediaSample::Audio(sample))
738+
if Some(input.stream_id) == self.input_audio_stream_id =>
739+
{
740+
self.input_audio_queue.push_back(sample);
741+
}
742+
None if Some(input.stream_id) == self.input_audio_stream_id => {
743+
self.input_audio_stream_id = None;
744+
}
745+
Some(MediaSample::Video(sample))
746+
if Some(input.stream_id) == self.input_video_stream_id =>
747+
{
748+
self.input_video_queue.push_back(sample);
749+
}
750+
None if Some(input.stream_id) == self.input_video_stream_id => {
751+
self.input_video_stream_id = None;
752+
}
753+
_ => return Err(orfail::Failure::new("BUG: unexpected input stream")),
754+
}
755+
Ok(())
756+
}
757+
758+
fn process_output(&mut self) -> orfail::Result<MediaProcessorOutput> {
759+
loop {
760+
if let Some(id) = self.input_video_stream_id
761+
&& self.input_video_queue.is_empty()
762+
{
763+
return Ok(MediaProcessorOutput::Pending {
764+
awaiting_stream_id: id,
765+
});
766+
} else if let Some(id) = self.input_audio_stream_id
767+
&& self.input_audio_queue.is_empty()
768+
{
769+
return Ok(MediaProcessorOutput::Pending {
770+
awaiting_stream_id: id,
771+
});
772+
}
773+
774+
let audio_timestamp = self.input_audio_queue.front().map(|x| x.timestamp);
775+
let video_timestamp = self.input_video_queue.front().map(|x| x.timestamp);
776+
777+
let (result, elapsed) = Seconds::elapsed(|| {
778+
self.handle_next_audio_and_video(audio_timestamp, video_timestamp)
779+
.or_fail()
780+
});
781+
782+
self.stats.total_processing_seconds.add(elapsed);
783+
784+
if !result? {
785+
return Ok(MediaProcessorOutput::Finished);
786+
}
787+
}
788+
}
789+
}
790+
746791
#[derive(Debug)]
747792
struct Chunk {
748793
offset: u64,

0 commit comments

Comments
 (0)