Skip to content

Commit 1f1c5b1

Browse files
committed
fallback to empty mirror for sync datafusion cataloglist
1 parent d1226e6 commit 1f1c5b1

File tree

9 files changed

+40
-20
lines changed

9 files changed

+40
-20
lines changed

datafusion-iceberg-sql/src/context.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ impl IcebergContext {
2828
for (catalog_name, namespace, name) in tables {
2929
let catalog = catalogs
3030
.catalog(catalog_name)
31-
.await
3231
.ok_or(DataFusionError::Internal(format!(
3332
"Catalog {} was not provided",
3433
&catalog_name

datafusion_iceberg/examples/refresh_materialized_view.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ pub(crate) async fn main() {
2626
.unwrap(),
2727
);
2828

29-
let catalog = catalog_list.catalog("iceberg").await.unwrap();
29+
let catalog = catalog_list.catalog("iceberg").unwrap();
3030

3131
let schema = Schema::builder()
3232
.with_fields(

datafusion_iceberg/src/catalog/catalog.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ impl IcebergCatalog {
1919
})
2020
}
2121

22+
pub fn new_sync(catalog: Arc<dyn Catalog>, branch: Option<&str>) -> Self {
23+
IcebergCatalog {
24+
catalog: Arc::new(Mirror::new_sync(catalog, branch.map(ToOwned::to_owned))),
25+
}
26+
}
27+
2228
pub fn catalog(&self) -> Arc<dyn Catalog> {
2329
self.catalog.catalog()
2430
}

datafusion_iceberg/src/catalog/catalog_list.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ use crate::error::Error;
99

1010
use super::catalog::IcebergCatalog;
1111

12-
pub struct IcebergCatalogList(DashMap<String, Arc<dyn CatalogProvider>>);
12+
pub struct IcebergCatalogList {
13+
catalogs: DashMap<String, Arc<dyn CatalogProvider>>,
14+
catalog_list: Arc<dyn CatalogList>,
15+
}
1316

1417
impl IcebergCatalogList {
1518
pub async fn new(catalog_list: Arc<dyn CatalogList>) -> Result<Self, Error> {
@@ -19,7 +22,7 @@ impl IcebergCatalogList {
1922
.then(|x| {
2023
let catalog_list = catalog_list.clone();
2124
async move {
22-
let catalog = catalog_list.catalog(&x).await?;
25+
let catalog = catalog_list.catalog(&x)?;
2326
Some((
2427
x,
2528
Arc::new(IcebergCatalog::new(catalog, None).await.ok()?)
@@ -31,7 +34,10 @@ impl IcebergCatalogList {
3134
.collect::<DashMap<_, _>>()
3235
.await;
3336

34-
Ok(IcebergCatalogList(map))
37+
Ok(IcebergCatalogList {
38+
catalogs: map,
39+
catalog_list,
40+
})
3541
}
3642
}
3743

@@ -41,18 +47,22 @@ impl CatalogProviderList for IcebergCatalogList {
4147
}
4248

4349
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
44-
self.0.get(name).as_deref().cloned()
50+
self.catalogs.get(name).as_deref().cloned().or_else(|| {
51+
self.catalog_list.catalog(name).map(|catalog| {
52+
Arc::new(IcebergCatalog::new_sync(catalog, None)) as Arc<dyn CatalogProvider>
53+
})
54+
})
4555
}
4656

4757
fn catalog_names(&self) -> Vec<String> {
48-
self.0.iter().map(|c| c.key().clone()).collect()
58+
self.catalogs.iter().map(|c| c.key().clone()).collect()
4959
}
5060

5161
fn register_catalog(
5262
&self,
5363
name: String,
5464
catalog: Arc<dyn CatalogProvider>,
5565
) -> Option<Arc<dyn CatalogProvider>> {
56-
self.0.insert(name, catalog)
66+
self.catalogs.insert(name, catalog)
5767
}
5868
}

datafusion_iceberg/src/catalog/mirror.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use dashmap::DashMap;
22
use datafusion::{datasource::TableProvider, error::DataFusionError};
33
use futures::{executor::LocalPool, task::LocalSpawnExt};
4-
use std::ops::Deref;
54
use std::{collections::HashSet, sync::Arc};
65

76
use iceberg_rust::spec::{tabular::TabularMetadata, view_metadata::REF_PREFIX};
@@ -66,6 +65,15 @@ impl Mirror {
6665
branch,
6766
})
6867
}
68+
pub fn new_sync(catalog: Arc<dyn Catalog>, branch: Option<String>) -> Self {
69+
let storage = DashMap::new();
70+
71+
Mirror {
72+
storage,
73+
catalog,
74+
branch,
75+
}
76+
}
6977
/// Lists all tables in the given namespace.
7078
pub fn table_names(&self, namespace: &Namespace) -> Result<Vec<Identifier>, DataFusionError> {
7179
let node = self

datafusion_iceberg/src/materialized_view.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,10 @@ pub async fn refresh_materialized_view(
6262
async move {
6363
let catalog_name = full_identifier.catalog().to_string();
6464
let identifier: Identifier = full_identifier.into();
65-
let catalog = catalog_list
66-
.catalog(&catalog_name)
67-
.await
68-
.ok_or(Error::NotFound(
69-
"Catalog".to_owned(),
70-
catalog_name.to_owned(),
71-
))?;
65+
let catalog = catalog_list.catalog(&catalog_name).ok_or(Error::NotFound(
66+
"Catalog".to_owned(),
67+
catalog_name.to_owned(),
68+
))?;
7269

7370
let tabular = match catalog.load_tabular(&identifier).await? {
7471
Tabular::View(_) => {
@@ -243,7 +240,7 @@ mod tests {
243240
.unwrap(),
244241
);
245242

246-
let catalog = catalog_list.catalog("iceberg").await.unwrap();
243+
let catalog = catalog_list.catalog("iceberg").unwrap();
247244

248245
let schema = Schema::builder()
249246
.with_schema_id(0)

iceberg-rest-catalog/src/catalog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ impl RestCatalogList {
497497

498498
#[async_trait]
499499
impl CatalogList for RestCatalogList {
500-
async fn catalog(&self, name: &str) -> Option<Arc<dyn Catalog>> {
500+
fn catalog(&self, name: &str) -> Option<Arc<dyn Catalog>> {
501501
Some(Arc::new(RestCatalog::new(
502502
Some(name),
503503
self.configuration.clone(),

iceberg-rust/src/catalog/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ pub trait Catalog: Send + Sync + Debug {
109109
#[async_trait::async_trait]
110110
pub trait CatalogList: Send + Sync + Debug {
111111
/// Get catalog from list by name
112-
async fn catalog(&self, name: &str) -> Option<Arc<dyn Catalog>>;
112+
fn catalog(&self, name: &str) -> Option<Arc<dyn Catalog>>;
113113
/// Get the list of available catalogs
114114
async fn list_catalogs(&self) -> Vec<String>;
115115
}

iceberg-sql-catalog/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -711,7 +711,7 @@ impl SqlCatalogList {
711711

712712
#[async_trait]
713713
impl CatalogList for SqlCatalogList {
714-
async fn catalog(&self, name: &str) -> Option<Arc<dyn Catalog>> {
714+
fn catalog(&self, name: &str) -> Option<Arc<dyn Catalog>> {
715715
Some(Arc::new(SqlCatalog {
716716
name: name.to_owned(),
717717
pool: self.pool.clone(),

0 commit comments

Comments
 (0)