Skip to content

Commit e83166e

Browse files
authored
fix: auto management odd pauses in consumer offset (#4504)
* fix: auto management odd pauses in consumer offset * feat: add flusher_check_period in consumer builder
1 parent 5acfcb0 commit e83166e

File tree

6 files changed

+522
-310
lines changed

6 files changed

+522
-310
lines changed

crates/fluvio-test/src/tests/consumer_offsets/auto/flush.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,13 @@ pub async fn test_strategy_auto_periodic_flush(
4343
for _ in 0..10 {
4444
ensure_read(&mut stream, &mut counts).await?;
4545
}
46-
sleep(Duration::from_secs(2)).await; // yeild to drive auto flush flow
46+
sleep(Duration::from_secs(3)).await; // yeild to drive auto flush flow
4747

48-
let consumer = find_consumer(client, &consumer_id, 0).await?;
49-
ensure!(consumer.is_some());
50-
ensure!(consumer.unwrap().offset > 0i64);
48+
for (partition, offset) in counts.iter().enumerate().take(partitions) {
49+
let consumer = find_consumer(client, &consumer_id, partition).await?;
50+
ensure!(consumer.is_some());
51+
ensure!(consumer.unwrap().offset == *offset);
52+
}
5153

5254
drop(stream); //we keep the stream alive to prevent flush on drop occuring
5355

crates/fluvio/src/consumer/config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::{FluvioError, Offset};
1111
use super::MAX_FETCH_BYTES;
1212

1313
const DEFAULT_OFFSET_FLUSH_PERIOD: Duration = Duration::from_secs(10);
14+
const DEFAULT_OFFSET_FLUSHER_CHECK_PERIOD: Duration = Duration::from_millis(100);
1415
const DEFAULT_RETRY_MODE: RetryMode = RetryMode::TryUntil(100);
1516

1617
/// Configures the behavior of consumer fetching and streaming
@@ -93,6 +94,8 @@ pub struct ConsumerConfigExt {
9394
pub offset_strategy: OffsetManagementStrategy,
9495
#[builder(default = "DEFAULT_OFFSET_FLUSH_PERIOD")]
9596
pub offset_flush: Duration,
97+
#[builder(default = "DEFAULT_OFFSET_FLUSHER_CHECK_PERIOD")]
98+
pub offset_flusher_check_period: Duration,
9699
#[builder(default)]
97100
pub disable_continuous: bool,
98101
#[builder(default = "*MAX_FETCH_BYTES")]
@@ -118,6 +121,7 @@ impl ConsumerConfigExt {
118121
Option<String>,
119122
OffsetManagementStrategy,
120123
Duration,
124+
Duration,
121125
) {
122126
let Self {
123127
topic: _,
@@ -131,6 +135,7 @@ impl ConsumerConfigExt {
131135
smartmodule,
132136
offset_strategy,
133137
offset_flush,
138+
offset_flusher_check_period,
134139
retry_mode: _,
135140
} = self;
136141

@@ -147,6 +152,7 @@ impl ConsumerConfigExt {
147152
offset_consumer,
148153
offset_strategy,
149154
offset_flush,
155+
offset_flusher_check_period,
150156
)
151157
}
152158
}
@@ -185,6 +191,7 @@ impl From<ConsumerConfigExt> for ConsumerConfig {
185191
offset_start: _,
186192
offset_strategy: _,
187193
offset_flush: _,
194+
offset_flusher_check_period: _,
188195
disable_continuous,
189196
max_bytes,
190197
isolation,

crates/fluvio/src/consumer/mod.rs

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -73,30 +73,18 @@ pub type BoxConsumerStream =
7373
pub type BoxConsumerStream =
7474
Pin<Box<dyn ConsumerStream<Item = Result<ConsumerRecord, ErrorCode>> + Send + 'static>>;
7575

76+
type ShararedConsumerStream = Arc<Mutex<BoxConsumerStream>>;
77+
78+
type ConsumerFutureOutput = (
79+
ShararedConsumerStream,
80+
Option<Result<(ConsumerRecord, Option<i64>), ErrorCode>>,
81+
);
82+
7683
/// Type alias to access consume stream as a future.
7784
#[cfg(target_arch = "wasm32")]
78-
type BoxConsumerFuture = Pin<
79-
Box<
80-
dyn Future<
81-
Output = (
82-
Arc<Mutex<BoxConsumerStream>>,
83-
Option<Result<(ConsumerRecord, Option<i64>), ErrorCode>>,
84-
),
85-
> + 'static,
86-
>,
87-
>;
85+
type BoxConsumerFuture = Pin<Box<dyn Future<Output = ConsumerFutureOutput> + 'static>>;
8886
#[cfg(not(target_arch = "wasm32"))]
89-
type BoxConsumerFuture = Pin<
90-
Box<
91-
dyn Future<
92-
Output = (
93-
Arc<Mutex<BoxConsumerStream>>,
94-
Option<Result<(ConsumerRecord, Option<i64>), ErrorCode>>,
95-
),
96-
> + Send
97-
+ 'static,
98-
>,
99-
>;
87+
type BoxConsumerFuture = Pin<Box<dyn Future<Output = ConsumerFutureOutput> + Send + 'static>>;
10088

10189
/// An interface for consuming events from a particular partition
10290
///
@@ -617,7 +605,8 @@ where
617605
&self,
618606
config: ConsumerConfigExt,
619607
) -> Result<SinglePartitionConsumerStream<impl Stream<Item = Result<Record, ErrorCode>>>> {
620-
let (offset, config, consumer_id, strategy, flush_period) = config.into_parts();
608+
let (offset, config, consumer_id, strategy, flush_period, flusher_check_period) =
609+
config.into_parts();
621610
let (stream, start_offset, stream_to_server) = self
622611
.inner_stream_batches_with_config(offset, config, consumer_id)
623612
.await?;
@@ -642,6 +631,7 @@ where
642631
flattened,
643632
strategy,
644633
flush_period,
634+
flusher_check_period,
645635
stream_to_server,
646636
))
647637
}

0 commit comments

Comments
 (0)