Skip to content

Commit 7ee0c1b

Browse files
authored
fix: produce only on available partitions (#4559)
* fix: produce only on available partitions * chore: move available partition check to a cache * chore: terminate flag to indicate end loop
1 parent 36b058c commit 7ee0c1b

File tree

3 files changed

+220
-24
lines changed

3 files changed

+220
-24
lines changed

crates/fluvio/src/producer/mod.rs

Lines changed: 154 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,24 @@
77
//! to specific topics and partitions, respectively.
88
//!
99
use std::collections::HashMap;
10+
use std::sync::atomic::AtomicU32;
1011
use std::sync::Arc;
1112

13+
use event_listener::Event;
14+
use tokio::select;
1215
use tracing::instrument;
1316
use async_lock::RwLock;
1417
use anyhow::Result;
1518

19+
use fluvio_future::task::spawn;
20+
use fluvio_future::timer::sleep;
1621
use fluvio_protocol::record::ReplicaKey;
1722
use fluvio_protocol::record::Record;
1823
use fluvio_compression::Compression;
1924
#[cfg(feature = "compress")]
2025
use fluvio_sc_schema::topic::CompressionAlgorithm;
26+
use fluvio_sc_schema::topic::TopicSpec;
27+
use fluvio_sc_schema::partition::PartitionSpec;
2128
use fluvio_types::PartitionId;
2229
use fluvio_types::event::StickyEvent;
2330

@@ -36,6 +43,7 @@ pub use fluvio_protocol::record::{RecordKey, RecordData};
3643

3744
use crate::spu::SpuPool;
3845
use crate::spu::SpuSocketPool;
46+
use crate::sync::StoreContext;
3947
use crate::FluvioError;
4048
use crate::metrics::ClientMetrics;
4149
use crate::producer::accumulator::{RecordAccumulator, PushRecord};
@@ -234,6 +242,7 @@ where
234242
topic: String,
235243
spu_pool: Arc<S>,
236244
record_accumulator: Arc<RecordAccumulator>,
245+
partition_tracker: Arc<PartitionAvailabilityTracker>,
237246
producer_pool: Arc<RwLock<ProducerPool>>,
238247
metrics: Arc<ClientMetrics>,
239248
}
@@ -249,15 +258,16 @@ where
249258
}
250259

251260
async fn push_record(self: Arc<Self>, record: Record) -> Result<PushRecord> {
252-
let topics = self.spu_pool.topics();
261+
let partition_count = self.partition_tracker.partition_count();
262+
let available_partitions = self.partition_tracker.available_partitions();
263+
let available_partitions_lock = available_partitions.read().await;
253264

254-
let topic_spec = topics
255-
.lookup_by_key(&self.topic)
256-
.await?
257-
.ok_or_else(|| FluvioError::TopicNotFound(self.topic.to_string()))?
258-
.spec;
259-
let partition_count = topic_spec.partitions();
260-
let partition_config = PartitionerConfig { partition_count };
265+
let partition_config = PartitionerConfig {
266+
partition_count,
267+
available_partitions: available_partitions_lock.clone(),
268+
};
269+
270+
drop(available_partitions_lock);
261271

262272
let key = record.key.as_ref().map(|k| k.as_ref());
263273
let value = record.value.as_ref();
@@ -402,6 +412,103 @@ cfg_if::cfg_if! {
402412
}
403413
}
404414

415+
/// Tracks the availability of partitions for a given topic
416+
struct PartitionAvailabilityTracker {
417+
topic_name: String,
418+
available_partitions: Arc<RwLock<Vec<PartitionId>>>,
419+
partition_count: AtomicU32,
420+
partitions: StoreContext<PartitionSpec>,
421+
topics: StoreContext<TopicSpec>,
422+
terminate: Arc<Event>,
423+
}
424+
425+
impl Drop for PartitionAvailabilityTracker {
426+
fn drop(&mut self) {
427+
self.terminate.notify(usize::MAX);
428+
}
429+
}
430+
431+
impl PartitionAvailabilityTracker {
432+
// Spawn a task to update available partitions
433+
fn start(
434+
initial_partition_count: u32,
435+
topic_name: String,
436+
partitions: StoreContext<PartitionSpec>,
437+
topics: StoreContext<TopicSpec>,
438+
) -> Arc<Self> {
439+
let tracker = Arc::new(Self {
440+
topic_name,
441+
available_partitions: Arc::new(RwLock::new(vec![])),
442+
partition_count: AtomicU32::new(initial_partition_count),
443+
partitions: partitions.clone(),
444+
topics: topics.clone(),
445+
terminate: Arc::new(Event::new()),
446+
});
447+
let shared_tracker = tracker.clone();
448+
449+
spawn(async move {
450+
loop {
451+
select! {
452+
_ = async {
453+
shared_tracker.update_available_partitions().await;
454+
sleep(std::time::Duration::from_secs(1)).await;
455+
} => {}
456+
_ = shared_tracker.terminate.listen() => {
457+
break;
458+
},
459+
}
460+
}
461+
});
462+
463+
tracker
464+
}
465+
466+
async fn update_available_partitions(&self) {
467+
let mut available_partitions = vec![];
468+
if let Ok(Some(topic)) = self.topics.lookup_by_key(&self.topic_name).await {
469+
let partition_count = topic.spec().partitions();
470+
// Update the partition count if it has changed
471+
if partition_count
472+
!= self
473+
.partition_count
474+
.load(std::sync::atomic::Ordering::Relaxed)
475+
{
476+
self.partition_count
477+
.store(partition_count, std::sync::atomic::Ordering::Relaxed);
478+
}
479+
480+
for partition_id in 0..partition_count {
481+
if let Ok(Some(partition)) = self
482+
.partitions
483+
.lookup_by_key(&ReplicaKey::new(&self.topic_name, partition_id))
484+
.await
485+
{
486+
if partition.status.is_online() {
487+
available_partitions.push(partition_id);
488+
}
489+
}
490+
}
491+
// Update the shared available partitions if they have changed
492+
{
493+
let read_lock = self.available_partitions.read().await;
494+
if available_partitions == *read_lock {
495+
return; // No change needed
496+
}
497+
}
498+
*self.available_partitions.write().await = available_partitions;
499+
}
500+
}
501+
502+
fn available_partitions(&self) -> Arc<RwLock<Vec<PartitionId>>> {
503+
self.available_partitions.clone()
504+
}
505+
506+
fn partition_count(&self) -> u32 {
507+
self.partition_count
508+
.load(std::sync::atomic::Ordering::Relaxed)
509+
}
510+
}
511+
405512
impl<S> TopicProducer<S>
406513
where
407514
S: SpuPool + Send + Sync + 'static,
@@ -412,12 +519,13 @@ where
412519
config: Arc<TopicProducerConfig>,
413520
metrics: Arc<ClientMetrics>,
414521
) -> Result<Self> {
415-
let topics = spu_pool.topics();
416-
let topic_spec: fluvio_sc_schema::topic::TopicSpec = topics
522+
let topic_store = spu_pool.topics();
523+
let topic_spec = topic_store
417524
.lookup_by_key(&topic)
418525
.await?
419526
.ok_or_else(|| FluvioError::TopicNotFound(topic.to_string()))?
420527
.spec;
528+
421529
let partition_count = topic_spec.partitions();
422530

423531
cfg_if::cfg_if! {
@@ -435,6 +543,9 @@ where
435543
partition_count,
436544
compression,
437545
);
546+
547+
let partitions = spu_pool.partitions().clone();
548+
438549
let producer_pool = ProducerPool::new(
439550
config.clone(),
440551
topic.clone(),
@@ -444,13 +555,21 @@ where
444555
config.callback.clone(),
445556
);
446557

558+
let partition_tracker = PartitionAvailabilityTracker::start(
559+
partition_count,
560+
topic.clone(),
561+
partitions,
562+
spu_pool.topics().clone(),
563+
);
564+
447565
Ok(Self {
448566
inner: Arc::new(InnerTopicProducer {
449567
config,
450568
topic,
451569
spu_pool,
452570
producer_pool: Arc::new(RwLock::new(producer_pool)),
453571
record_accumulator: Arc::new(record_accumulator),
572+
partition_tracker,
454573
metrics: metrics.clone(),
455574
}),
456575
#[cfg(feature = "smartengine")]
@@ -641,11 +760,16 @@ mod tests {
641760
use std::sync::Arc;
642761

643762
use async_trait::async_trait;
644-
use fluvio_protocol::record::RecordKey;
645-
use fluvio_sc_schema::{partition::PartitionSpec, store::MetadataStoreObject, topic::TopicSpec};
763+
use fluvio_future::timer::sleep;
764+
use fluvio_protocol::record::{RecordKey, ReplicaKey};
765+
use fluvio_sc_schema::{
766+
partition::{PartitionSpec},
767+
store::MetadataStoreObject,
768+
topic::TopicSpec,
769+
};
646770
use fluvio_socket::{ClientConfig, SocketError, StreamSocket, VersionedSerialSocket};
647771
use fluvio_stream_dispatcher::metadata::local::LocalMetadataItem;
648-
use fluvio_types::SpuId;
772+
use fluvio_types::{PartitionId, SpuId};
649773

650774
use crate::{
651775
metrics::ClientMetrics,
@@ -656,6 +780,7 @@ mod tests {
656780

657781
struct SpuPoolMock {
658782
topics: StoreContext<TopicSpec>,
783+
partitions: StoreContext<PartitionSpec>,
659784
}
660785

661786
#[async_trait]
@@ -691,7 +816,7 @@ mod tests {
691816
}
692817

693818
fn partitions(&self) -> &StoreContext<PartitionSpec> {
694-
todo!()
819+
&self.partitions
695820
}
696821
}
697822

@@ -708,10 +833,21 @@ mod tests {
708833
(partitions_count, 2, false).into(), // 2 partitions, 2 replicas, not ignore rack
709834
),
710835
];
711-
836+
let partition_2 = vec![
837+
MetadataStoreObject::<PartitionSpec, LocalMetadataItem>::with_spec(
838+
ReplicaKey::new(topic.clone(), 0 as PartitionId),
839+
vec![0, 1].into(),
840+
),
841+
MetadataStoreObject::<PartitionSpec, LocalMetadataItem>::with_spec(
842+
ReplicaKey::new(topic.clone(), 1 as PartitionId),
843+
vec![0, 1].into(),
844+
),
845+
];
712846
let topics = StoreContext::<TopicSpec>::new();
713-
let spu_pool = Arc::new(SpuPoolMock { topics });
847+
let partitions = StoreContext::<PartitionSpec>::new();
848+
let spu_pool = Arc::new(SpuPoolMock { topics, partitions });
714849
spu_pool.topics().store().sync_all(topic_2_partitions).await;
850+
spu_pool.partitions().store().sync_all(partition_2).await;
715851
let producer = TopicProducer::new(topic.clone(), spu_pool.clone(), config, metrics)
716852
.await
717853
.expect("producer");
@@ -750,6 +886,8 @@ mod tests {
750886

751887
spu_pool.topics.store().sync_all(topic_3_partitions).await;
752888

889+
sleep(std::time::Duration::from_secs(2)).await;
890+
753891
let _ = producer
754892
.send(RecordKey::NULL, "789".to_string())
755893
.await

crates/fluvio/src/producer/partitioning.rs

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ pub trait Partitioner {
2424

2525
pub struct PartitionerConfig {
2626
pub partition_count: PartitionCount,
27+
pub available_partitions: Vec<PartitionCount>,
2728
}
2829

2930
impl PartitionerConfig {
31+
/// Creates a new `PartitionerConfig` with the given partition count and available partitions.
3032
pub fn partition_count(&self) -> PartitionCount {
3133
self.partition_count
3234
}
@@ -60,8 +62,12 @@ impl Partitioner for SiphashRoundRobinPartitioner {
6062
None => {
6163
// Atomic increment. This will wrap on overflow, which is fine
6264
// because we are only interested in the modulus anyway
63-
let partition = self.index.fetch_add(1, Ordering::Relaxed);
64-
partition % config.partition_count
65+
let index = self.index.fetch_add(1, Ordering::Relaxed);
66+
if config.available_partitions.is_empty() {
67+
return index % config.partition_count;
68+
}
69+
let partition = index as usize % config.available_partitions.len();
70+
config.available_partitions[partition]
6571
}
6672
}
6773
}
@@ -110,7 +116,10 @@ mod tests {
110116
/// Ensure that feeding keyless records one-at-a-time does not assign the same partition
111117
#[test]
112118
fn test_round_robin_individual() {
113-
let config = PartitionerConfig { partition_count: 3 };
119+
let config = PartitionerConfig {
120+
partition_count: 3,
121+
available_partitions: vec![0, 1, 2],
122+
};
114123
let partitioner = SiphashRoundRobinPartitioner::new();
115124

116125
let key1_partition = partitioner.partition(&config, None, &[]);
@@ -133,7 +142,10 @@ mod tests {
133142

134143
let (tx, rx) = std::sync::mpsc::channel();
135144
let partitioner = Arc::new(SiphashRoundRobinPartitioner::new());
136-
let config = Arc::new(PartitionerConfig { partition_count: 4 });
145+
let config = Arc::new(PartitionerConfig {
146+
partition_count: 4,
147+
available_partitions: vec![0, 1, 2, 3],
148+
});
137149

138150
// We have 5 threads calculating partitions 400 times each for NULL key (aka round-robin).
139151
// This is 20,000 records total, among 4 partitions. If it is evenly distributed like we
@@ -162,4 +174,26 @@ mod tests {
162174
assert_eq!(count, 500);
163175
}
164176
}
177+
178+
#[test]
179+
fn test_available_partitions() {
180+
let config = PartitionerConfig {
181+
partition_count: 3,
182+
available_partitions: vec![0, 2], // Only partitions 0 and 2 are available
183+
};
184+
let partitioner = SiphashRoundRobinPartitioner::new();
185+
186+
let key1_partition = partitioner.partition(&config, None, &[]);
187+
assert_eq!(key1_partition, 0);
188+
let key2_partition = partitioner.partition(&config, None, &[]);
189+
assert_eq!(key2_partition, 2);
190+
let key3_partition = partitioner.partition(&config, None, &[]);
191+
assert_eq!(key3_partition, 0);
192+
let key4_partition = partitioner.partition(&config, None, &[]);
193+
assert_eq!(key4_partition, 2);
194+
let key5_partition = partitioner.partition(&config, None, &[]);
195+
assert_eq!(key5_partition, 0);
196+
let key6_partition = partitioner.partition(&config, None, &[]);
197+
assert_eq!(key6_partition, 2);
198+
}
165199
}

0 commit comments

Comments
 (0)