Skip to content

Commit b248b1f

Browse files
committed
use connection pool for sql catalog
1 parent 546199d commit b248b1f

File tree

1 file changed

+37
-51
lines changed

1 file changed

+37
-51
lines changed

iceberg-sql-catalog/src/lib.rs

Lines changed: 37 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::{collections::HashMap, sync::Arc};
22

33
use async_trait::async_trait;
44
use dashmap::DashMap;
5-
use futures::lock::Mutex;
65
use iceberg_rust::{
76
catalog::{
87
bucket::Bucket,
@@ -30,8 +29,9 @@ use iceberg_rust::{
3029
};
3130
use object_store::ObjectStore;
3231
use sqlx::{
33-
any::{install_default_drivers, AnyRow},
34-
AnyConnection, Connection, Row,
32+
any::{install_default_drivers, AnyPoolOptions, AnyRow},
33+
pool::PoolOptions,
34+
AnyPool, Row,
3535
};
3636
use uuid::Uuid;
3737

@@ -40,7 +40,7 @@ use crate::error::Error;
4040
#[derive(Debug)]
4141
pub struct SqlCatalog {
4242
name: String,
43-
connection: Arc<Mutex<AnyConnection>>,
43+
pool: AnyPool,
4444
object_store: Arc<dyn ObjectStore>,
4545
cache: Arc<DashMap<Identifier, (String, TabularMetadata)>>,
4646
}
@@ -55,7 +55,13 @@ impl SqlCatalog {
5555
) -> Result<Self, Error> {
5656
install_default_drivers();
5757

58-
let mut pool = AnyConnection::connect(&url).await?;
58+
let mut options = PoolOptions::new();
59+
60+
if url.starts_with("sqlite") {
61+
options = options.max_connections(1);
62+
}
63+
64+
let pool = AnyPoolOptions::connect(options, &url).await?;
5965

6066
sqlx::query(
6167
"create table if not exists iceberg_tables (
@@ -67,7 +73,7 @@ impl SqlCatalog {
6773
primary key (catalog_name, table_namespace, table_name)
6874
);",
6975
)
70-
.execute(&mut pool)
76+
.execute(&pool)
7177
.await?;
7278

7379
sqlx::query(
@@ -79,21 +85,21 @@ impl SqlCatalog {
7985
primary key (catalog_name, namespace, property_key)
8086
);",
8187
)
82-
.execute(&mut pool)
88+
.execute(&pool)
8389
.await
8490
.map_err(Error::from)?;
8591

8692
Ok(SqlCatalog {
8793
name: name.to_owned(),
88-
connection: Arc::new(Mutex::new(pool)),
94+
pool,
8995
object_store,
9096
cache: Arc::new(DashMap::new()),
9197
})
9298
}
9399

94100
pub fn catalog_list(&self) -> Arc<SqlCatalogList> {
95101
Arc::new(SqlCatalogList {
96-
connection: self.connection.clone(),
102+
pool: self.pool.clone(),
97103
object_store: self.object_store.clone(),
98104
})
99105
}
@@ -169,8 +175,7 @@ impl Catalog for SqlCatalog {
169175
let namespace = namespace.to_string();
170176

171177
let rows = {
172-
let mut connection = self.connection.lock().await;
173-
sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}';",&name, &namespace)).fetch_all(&mut *connection).await.map_err(Error::from)?
178+
sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}';",&name, &namespace)).fetch_all(&self.pool).await.map_err(Error::from)?
174179
};
175180
let iter = rows.iter().map(query_map);
176181

@@ -188,12 +193,11 @@ impl Catalog for SqlCatalog {
188193
let name = self.name.clone();
189194

190195
let rows = {
191-
let mut connection = self.connection.lock().await;
192196
sqlx::query(&format!(
193197
"select distinct table_namespace from iceberg_tables where catalog_name = '{}';",
194198
&name
195199
))
196-
.fetch_all(&mut *connection)
200+
.fetch_all(&self.pool)
197201
.await
198202
.map_err(Error::from)?
199203
};
@@ -215,10 +219,9 @@ impl Catalog for SqlCatalog {
215219
let name = identifier.name().to_string();
216220

217221
let rows = {
218-
let mut connection = self.connection.lock().await;
219222
sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
220223
&namespace,
221-
&name)).fetch_all(&mut *connection).await.map_err(Error::from)?
224+
&name)).fetch_all(&self.pool).await.map_err(Error::from)?
222225
};
223226
let mut iter = rows.iter().map(query_map);
224227

@@ -230,10 +233,9 @@ impl Catalog for SqlCatalog {
230233
let name = identifier.name().to_string();
231234

232235
{
233-
let mut connection = self.connection.lock().await;
234236
sqlx::query(&format!("delete from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
235237
&namespace,
236-
&name)).execute(&mut *connection).await.map_err(Error::from)?
238+
&name)).execute(&self.pool).await.map_err(Error::from)?
237239
};
238240
Ok(())
239241
}
@@ -243,10 +245,9 @@ impl Catalog for SqlCatalog {
243245
let name = identifier.name().to_string();
244246

245247
{
246-
let mut connection = self.connection.lock().await;
247248
sqlx::query(&format!("delete from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
248249
&namespace,
249-
&name)).execute(&mut *connection).await.map_err(Error::from)?
250+
&name)).execute(&self.pool).await.map_err(Error::from)?
250251
};
251252
Ok(())
252253
}
@@ -256,10 +257,9 @@ impl Catalog for SqlCatalog {
256257
let name = identifier.name().to_string();
257258

258259
{
259-
let mut connection = self.connection.lock().await;
260260
sqlx::query(&format!("delete from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
261261
&namespace,
262-
&name)).execute(&mut *connection).await.map_err(Error::from)?
262+
&name)).execute(&self.pool).await.map_err(Error::from)?
263263
};
264264
Ok(())
265265
}
@@ -273,10 +273,9 @@ impl Catalog for SqlCatalog {
273273
let name = identifier.name().to_string();
274274

275275
let row = {
276-
let mut connection = self.connection.lock().await;
277276
sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
278277
&namespace,
279-
&name)).fetch_one(&mut *connection).await.map_err(Error::from)?
278+
&name)).fetch_one(&self.pool).await.map_err(Error::from)?
280279
};
281280
let row = query_map(&row).map_err(Error::from)?;
282281

@@ -331,9 +330,7 @@ impl Catalog for SqlCatalog {
331330
let name = identifier.name().to_string();
332331
let metadata_location = metadata_location.to_string();
333332

334-
let mut connection = self.connection.lock().await;
335-
336-
sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{}', '{}', '{}', '{}');",catalog_name,namespace,name, metadata_location)).execute(&mut *connection).await.map_err(Error::from)?;
333+
sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{}', '{}', '{}', '{}');",catalog_name,namespace,name, metadata_location)).execute(&self.pool).await.map_err(Error::from)?;
337334
}
338335
self.clone()
339336
.load_tabular(&identifier)
@@ -373,9 +370,7 @@ impl Catalog for SqlCatalog {
373370
let name = identifier.name().to_string();
374371
let metadata_location = metadata_location.to_string();
375372

376-
let mut connection = self.connection.lock().await;
377-
378-
sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{}', '{}', '{}', '{}');",catalog_name,namespace,name, metadata_location)).execute(&mut *connection).await.map_err(Error::from)?;
373+
sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{}', '{}', '{}', '{}');",catalog_name,namespace,name, metadata_location)).execute(&self.pool).await.map_err(Error::from)?;
379374
}
380375
self.clone()
381376
.load_tabular(&identifier)
@@ -422,8 +417,7 @@ impl Catalog for SqlCatalog {
422417
)
423418
.await?;
424419
{
425-
let mut connection = self.connection.lock().await;
426-
let mut transaction = connection.begin().await.map_err(Error::from)?;
420+
let mut transaction = self.pool.begin().await.map_err(Error::from)?;
427421
let catalog_name = self.name.clone();
428422
let namespace = identifier.namespace().to_string();
429423
let name = identifier.name().to_string();
@@ -493,8 +487,7 @@ impl Catalog for SqlCatalog {
493487
let previous_metadata_file_location = previous_metadata_location.to_string();
494488

495489
{
496-
let mut connection = self.connection.lock().await;
497-
sqlx::query(&format!("update iceberg_tables set metadata_location = '{}', previous_metadata_location = '{}' where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';", metadata_file_location, previous_metadata_file_location,catalog_name,namespace,name)).execute(&mut *connection).await.map_err(Error::from)?
490+
sqlx::query(&format!("update iceberg_tables set metadata_location = '{}', previous_metadata_location = '{}' where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';", metadata_file_location, previous_metadata_file_location,catalog_name,namespace,name)).execute(&self.pool).await.map_err(Error::from)?
498491
};
499492
}
500493
}
@@ -557,8 +550,7 @@ impl Catalog for SqlCatalog {
557550
let previous_metadata_file_location = previous_metadata_location.to_string();
558551

559552
{
560-
let mut connection = self.connection.lock().await;
561-
sqlx::query(&format!("update iceberg_tables set metadata_location = '{}', previous_metadata_location = '{}' where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';", metadata_file_location, previous_metadata_file_location,catalog_name,namespace,name)).execute(&mut *connection).await.map_err(Error::from)?
553+
sqlx::query(&format!("update iceberg_tables set metadata_location = '{}', previous_metadata_location = '{}' where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';", metadata_file_location, previous_metadata_file_location,catalog_name,namespace,name)).execute(&self.pool).await.map_err(Error::from)?
562554
};
563555
}
564556
}
@@ -619,8 +611,7 @@ impl Catalog for SqlCatalog {
619611
let previous_metadata_file_location = previous_metadata_location.to_string();
620612

621613
{
622-
let mut connection = self.connection.lock().await;
623-
sqlx::query(&format!("update iceberg_tables set metadata_location = '{}', previous_metadata_location = '{}' where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';", metadata_file_location, previous_metadata_file_location,catalog_name,namespace,name)).execute(&mut *connection).await.map_err(Error::from)?
614+
sqlx::query(&format!("update iceberg_tables set metadata_location = '{}', previous_metadata_location = '{}' where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';", metadata_file_location, previous_metadata_file_location,catalog_name,namespace,name)).execute(&self.pool).await.map_err(Error::from)?
624615
};
625616
}
626617
}
@@ -678,8 +669,7 @@ impl Catalog for SqlCatalog {
678669
let metadata_location = metadata_location.to_string();
679670

680671
{
681-
let mut connection = self.connection.lock().await;
682-
sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{}', '{}', '{}', '{}');",catalog_name,namespace,name, metadata_location)).execute(&mut *connection).await.map_err(Error::from)?
672+
sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{}', '{}', '{}', '{}');",catalog_name,namespace,name, metadata_location)).execute(&self.pool).await.map_err(Error::from)?
683673
};
684674
}
685675
self.clone()
@@ -702,7 +692,7 @@ impl SqlCatalog {
702692
pub fn duplicate(&self, name: &str) -> Self {
703693
Self {
704694
name: name.to_owned(),
705-
connection: self.connection.clone(),
695+
pool: self.pool.clone(),
706696
object_store: self.object_store.clone(),
707697
cache: Arc::new(DashMap::new()),
708698
}
@@ -711,15 +701,15 @@ impl SqlCatalog {
711701

712702
#[derive(Debug)]
713703
pub struct SqlCatalogList {
714-
connection: Arc<Mutex<AnyConnection>>,
704+
pool: AnyPool,
715705
object_store: Arc<dyn ObjectStore>,
716706
}
717707

718708
impl SqlCatalogList {
719709
pub async fn new(url: &str, object_store: Arc<dyn ObjectStore>) -> Result<Self, Error> {
720710
install_default_drivers();
721711

722-
let mut connection = AnyConnection::connect(&url).await?;
712+
let pool = AnyPoolOptions::connect(PoolOptions::new().max_connections(1), &url).await?;
723713

724714
sqlx::query(
725715
"create table if not exists iceberg_tables (
@@ -731,7 +721,7 @@ impl SqlCatalogList {
731721
primary key (catalog_name, table_namespace, table_name)
732722
);",
733723
)
734-
.execute(&mut connection)
724+
.execute(&pool)
735725
.await?;
736726

737727
sqlx::query(
@@ -743,14 +733,11 @@ impl SqlCatalogList {
743733
primary key (catalog_name, namespace, property_key)
744734
);",
745735
)
746-
.execute(&mut connection)
736+
.execute(&pool)
747737
.await
748738
.map_err(Error::from)?;
749739

750-
Ok(SqlCatalogList {
751-
connection: Arc::new(Mutex::new(connection)),
752-
object_store,
753-
})
740+
Ok(SqlCatalogList { pool, object_store })
754741
}
755742
}
756743

@@ -759,16 +746,15 @@ impl CatalogList for SqlCatalogList {
759746
async fn catalog(&self, name: &str) -> Option<Arc<dyn Catalog>> {
760747
Some(Arc::new(SqlCatalog {
761748
name: name.to_owned(),
762-
connection: self.connection.clone(),
749+
pool: self.pool.clone(),
763750
object_store: self.object_store.clone(),
764751
cache: Arc::new(DashMap::new()),
765752
}))
766753
}
767754
async fn list_catalogs(&self) -> Vec<String> {
768755
let rows = {
769-
let mut connection = self.connection.lock().await;
770756
sqlx::query("select distinct catalog_name from iceberg_tables;")
771-
.fetch_all(&mut *connection)
757+
.fetch_all(&self.pool)
772758
.await
773759
.map_err(Error::from)
774760
.unwrap_or_default()

0 commit comments

Comments
 (0)