Skip to content

Commit fe8b47d

Browse files
authored
Merge pull request #203 from JanKaul/snapshot-partition-bounds
Add function to obtain partition bounds of snapshot
2 parents d7849b0 + 94e17ef commit fe8b47d

File tree

23 files changed

+132
-80
lines changed

23 files changed

+132
-80
lines changed

catalogs/iceberg-file-catalog/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -716,7 +716,7 @@ pub mod tests {
716716
.with_config("aws_secret_access_key".parse().unwrap(), "password")
717717
.with_config(
718718
"endpoint".parse().unwrap(),
719-
format!("http://{}:{}", localstack_host, localstack_port),
719+
format!("http://{localstack_host}:{localstack_port}"),
720720
)
721721
.with_config("region".parse().unwrap(), "us-east-1")
722722
.with_config("allow_http".parse().unwrap(), "true");

catalogs/iceberg-glue-catalog/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1067,7 +1067,7 @@ pub mod tests {
10671067
.with_config("aws_secret_access_key".parse().unwrap(), "password")
10681068
.with_config(
10691069
"endpoint".parse().unwrap(),
1070-
format!("http://{}:{}", localstack_host, localstack_port),
1070+
format!("http://{localstack_host}:{localstack_port}"),
10711071
)
10721072
.with_config("region".parse().unwrap(), "us-east-1")
10731073
.with_config("allow_http".parse().unwrap(), "true");

catalogs/iceberg-glue-catalog/src/schema.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,11 @@ pub(crate) fn type_to_glue(datatype: &Type) -> Result<String, Error> {
3131
PrimitiveType::String | PrimitiveType::Uuid => Ok("string".to_owned()),
3232
PrimitiveType::Binary | PrimitiveType::Fixed(_) => Ok("binary".to_owned()),
3333
x => Err(Error::InvalidFormat(format!(
34-
"Type {} cannot be converted to glue type",
35-
x
34+
"Type {x} cannot be converted to glue type"
3635
))),
3736
},
3837
x => Err(Error::InvalidFormat(format!(
39-
"Type {} cannot be converted to glue type",
40-
x
38+
"Type {x} cannot be converted to glue type"
4139
))),
4240
}
4341
}

catalogs/iceberg-rest-catalog/src/catalog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -730,7 +730,7 @@ pub mod tests {
730730
.with_config("aws_secret_access_key".parse().unwrap(), "password")
731731
.with_config(
732732
"endpoint".parse().unwrap(),
733-
format!("http://{}:{}", localstack_host, localstack_port),
733+
format!("http://{localstack_host}:{localstack_port}"),
734734
)
735735
.with_config("region".parse().unwrap(), "us-east-1")
736736
.with_config("allow_http".parse().unwrap(), "true");

catalogs/iceberg-sql-catalog/src/lib.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ impl Catalog for SqlCatalog {
340340
let name = identifier.name().to_string();
341341
let metadata_location = metadata_location.to_string();
342342

343-
sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{}', '{}', '{}', '{}');",catalog_name,namespace,name, metadata_location)).execute(&self.pool).await.map_err(Error::from)?;
343+
sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{catalog_name}', '{namespace}', '{name}', '{metadata_location}');")).execute(&self.pool).await.map_err(Error::from)?;
344344
}
345345
self.cache.write().unwrap().insert(
346346
identifier.clone(),
@@ -380,7 +380,7 @@ impl Catalog for SqlCatalog {
380380
let name = identifier.name().to_string();
381381
let metadata_location = metadata_location.to_string();
382382

383-
sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{}', '{}', '{}', '{}');",catalog_name,namespace,name, metadata_location)).execute(&self.pool).await.map_err(Error::from)?;
383+
sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{catalog_name}', '{namespace}', '{name}', '{metadata_location}');")).execute(&self.pool).await.map_err(Error::from)?;
384384
}
385385
self.cache.write().unwrap().insert(
386386
identifier.clone(),
@@ -423,14 +423,14 @@ impl Catalog for SqlCatalog {
423423
let name = identifier.name().to_string();
424424
let metadata_location = metadata_location.to_string();
425425

426-
sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{}', '{}', '{}', '{}');",catalog_name,namespace,name, metadata_location)).execute(&mut *transaction).await.map_err(Error::from)?;
426+
sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{catalog_name}', '{namespace}', '{name}', '{metadata_location}');")).execute(&mut *transaction).await.map_err(Error::from)?;
427427

428428
let table_catalog_name = self.name.clone();
429429
let table_namespace = table_identifier.namespace().to_string();
430430
let table_name = table_identifier.name().to_string();
431431
let table_metadata_location = table_metadata_location.to_string();
432432

433-
sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{}', '{}', '{}', '{}');",table_catalog_name,table_namespace,table_name, table_metadata_location)).execute(&mut *transaction).await.map_err(Error::from)?;
433+
sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{table_catalog_name}', '{table_namespace}', '{table_name}', '{table_metadata_location}');")).execute(&mut *transaction).await.map_err(Error::from)?;
434434

435435
transaction.commit().await.map_err(Error::from)?;
436436
}
@@ -483,7 +483,7 @@ impl Catalog for SqlCatalog {
483483
let metadata_file_location = metadata_location.to_string();
484484
let previous_metadata_file_location = previous_metadata_location.to_string();
485485

486-
sqlx::query(&format!("update iceberg_tables set metadata_location = '{}', previous_metadata_location = '{}' where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}' and metadata_location = '{}';", metadata_file_location, previous_metadata_file_location,catalog_name,namespace,name, previous_metadata_file_location)).execute(&self.pool).await.map_err(Error::from)?;
486+
sqlx::query(&format!("update iceberg_tables set metadata_location = '{metadata_file_location}', previous_metadata_location = '{previous_metadata_file_location}' where catalog_name = '{catalog_name}' and table_namespace = '{namespace}' and table_name = '{name}' and metadata_location = '{previous_metadata_file_location}';")).execute(&self.pool).await.map_err(Error::from)?;
487487

488488
self.cache.write().unwrap().insert(
489489
identifier.clone(),
@@ -541,7 +541,7 @@ impl Catalog for SqlCatalog {
541541
let metadata_file_location = metadata_location.to_string();
542542
let previous_metadata_file_location = previous_metadata_location.to_string();
543543

544-
sqlx::query(&format!("update iceberg_tables set metadata_location = '{}', previous_metadata_location = '{}' where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}' and metadata_location = '{}';", metadata_file_location, previous_metadata_file_location,catalog_name,namespace,name,previous_metadata_file_location)).execute(&self.pool).await.map_err(Error::from)?;
544+
sqlx::query(&format!("update iceberg_tables set metadata_location = '{metadata_file_location}', previous_metadata_location = '{previous_metadata_file_location}' where catalog_name = '{catalog_name}' and table_namespace = '{namespace}' and table_name = '{name}' and metadata_location = '{previous_metadata_file_location}';")).execute(&self.pool).await.map_err(Error::from)?;
545545
self.cache.write().unwrap().insert(
546546
identifier.clone(),
547547
(metadata_location.clone(), metadata.clone()),
@@ -597,7 +597,7 @@ impl Catalog for SqlCatalog {
597597
let metadata_file_location = metadata_location.to_string();
598598
let previous_metadata_file_location = previous_metadata_location.to_string();
599599

600-
sqlx::query(&format!("update iceberg_tables set metadata_location = '{}', previous_metadata_location = '{}' where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}' and metadata_location = '{}';", metadata_file_location, previous_metadata_file_location,catalog_name,namespace,name, previous_metadata_file_location)).execute(&self.pool).await.map_err(Error::from)?;
600+
sqlx::query(&format!("update iceberg_tables set metadata_location = '{metadata_file_location}', previous_metadata_location = '{previous_metadata_file_location}' where catalog_name = '{catalog_name}' and table_namespace = '{namespace}' and table_name = '{name}' and metadata_location = '{previous_metadata_file_location}';")).execute(&self.pool).await.map_err(Error::from)?;
601601
self.cache.write().unwrap().insert(
602602
identifier.clone(),
603603
(metadata_location.clone(), metadata.clone()),
@@ -633,7 +633,7 @@ impl Catalog for SqlCatalog {
633633
let name = identifier.name().to_string();
634634
let metadata_location = metadata_location.to_string();
635635

636-
sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{}', '{}', '{}', '{}');",catalog_name,namespace,name, metadata_location)).execute(&self.pool).await.map_err(Error::from)?;
636+
sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{catalog_name}', '{namespace}', '{name}', '{metadata_location}');")).execute(&self.pool).await.map_err(Error::from)?;
637637
}
638638
self.cache.write().unwrap().insert(
639639
identifier.clone(),
@@ -810,17 +810,14 @@ pub mod tests {
810810
.with_config("aws_secret_access_key".parse().unwrap(), "password")
811811
.with_config(
812812
"endpoint".parse().unwrap(),
813-
format!("http://{}:{}", localstack_host, localstack_port),
813+
format!("http://{localstack_host}:{localstack_port}"),
814814
)
815815
.with_config("region".parse().unwrap(), "us-east-1")
816816
.with_config("allow_http".parse().unwrap(), "true");
817817

818818
let iceberg_catalog = Arc::new(
819819
SqlCatalog::new(
820-
&format!(
821-
"postgres://postgres:postgres@{}:{}/postgres",
822-
postgres_host, postgres_port
823-
),
820+
&format!("postgres://postgres:postgres@{postgres_host}:{postgres_port}/postgres"),
824821
"warehouse",
825822
object_store,
826823
)

datafusion_iceberg/src/statistics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ fn convert_value_to_scalar_value(value: Value) -> Result<ScalarValue, Error> {
165165
)),
166166
x => Err(Error::Conversion(
167167
"Iceberg value".to_string(),
168-
format!("{:?}", x),
168+
format!("{x:?}"),
169169
)),
170170
}
171171
}

datafusion_iceberg/src/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ fn fake_object_store_url(table_location_url: &str) -> Option<ObjectStoreUrl> {
318318
u.path()
319319
.as_bytes()
320320
.iter()
321-
.map(|b| format!("{:02x}", b))
321+
.map(|b| format!("{b:02x}"))
322322
.collect::<Vec<_>>()
323323
.join("")
324324
)))

datafusion_iceberg/tests/empty_insert.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ pub async fn test_empty_insert() {
4848
.with_config("aws_secret_access_key".parse().unwrap(), "password")
4949
.with_config(
5050
"endpoint".parse().unwrap(),
51-
format!("http://{}:{}", localstack_host, localstack_port),
51+
format!("http://{localstack_host}:{localstack_port}"),
5252
)
5353
.with_config("region".parse().unwrap(), "us-east-1")
5454
.with_config("allow_http".parse().unwrap(), "true");

datafusion_iceberg/tests/integration_trino.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use tokio::time::sleep;
2929

3030
fn configuration(host: &str, port: u16) -> Configuration {
3131
Configuration {
32-
base_path: format!("http://{}:{}", host, port),
32+
base_path: format!("http://{host}:{port}"),
3333
user_agent: None,
3434
client: reqwest::Client::new(),
3535
basic_auth: None,
@@ -63,7 +63,7 @@ async fn wait_for_worker(trino_container: &ContainerAsync<GenericImage>, timeout
6363
}
6464
tokio::time::sleep(Duration::from_millis(500)).await;
6565
}
66-
panic!("Trino still not queryable after {:?}", timeout);
66+
panic!("Trino still not queryable after {timeout:?}");
6767
}
6868

6969
#[tokio::test]
@@ -130,17 +130,15 @@ async fn integration_trino_rest() {
130130
writeln!(tmp_file, "iceberg.catalog.type=rest").unwrap();
131131
writeln!(
132132
tmp_file,
133-
"iceberg.rest-catalog.uri=http://{}:{}",
134-
docker_host, rest_port
133+
"iceberg.rest-catalog.uri=http://{docker_host}:{rest_port}"
135134
)
136135
.unwrap();
137136
writeln!(tmp_file, "iceberg.rest-catalog.warehouse=s3://warehouse/").unwrap();
138137
writeln!(tmp_file, "iceberg.file-format=PARQUET").unwrap();
139138
writeln!(tmp_file, "fs.native-s3.enabled=true").unwrap();
140139
writeln!(
141140
tmp_file,
142-
"s3.endpoint=http://{}:{}",
143-
docker_host, localstack_port
141+
"s3.endpoint=http://{docker_host}:{localstack_port}"
144142
)
145143
.unwrap();
146144
writeln!(tmp_file, "s3.path-style-access=true").unwrap();
@@ -179,7 +177,7 @@ async fn integration_trino_rest() {
179177
"iceberg",
180178
"--file",
181179
"/tmp/trino.sql",
182-
&format!("http://{}:{}", docker_host, trino_port),
180+
&format!("http://{docker_host}:{trino_port}"),
183181
])
184182
.with_cmd_ready_condition(CmdWaitFor::exit_code(0)),
185183
)
@@ -191,7 +189,7 @@ async fn integration_trino_rest() {
191189
.with_config("aws_secret_access_key".parse().unwrap(), "password")
192190
.with_config(
193191
"endpoint".parse().unwrap(),
194-
format!("http://{}:{}", localstack_host, localstack_port),
192+
format!("http://{localstack_host}:{localstack_port}"),
195193
)
196194
.with_config("region".parse().unwrap(), "us-east-1")
197195
.with_config("allow_http".parse().unwrap(), "true");
@@ -294,7 +292,7 @@ async fn integration_trino_rest() {
294292
"SELECT sum(amount) FROM iceberg.test.test_orders;",
295293
"--output-format",
296294
"NULL",
297-
&format!("http://{}:{}", docker_host, trino_port),
295+
&format!("http://{docker_host}:{trino_port}"),
298296
])
299297
.with_cmd_ready_condition(CmdWaitFor::exit_code(0)),
300298
)

iceberg-rust-spec/src/arrow/schema.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,7 @@ impl TryFrom<&DataType> for Type {
172172
element: Box::new(field.data_type().try_into()?),
173173
})),
174174
x => Err(Error::NotSupported(format!(
175-
"Arrow datatype {} is not supported.",
176-
x
175+
"Arrow datatype {x} is not supported."
177176
))),
178177
}
179178
}

0 commit comments

Comments
 (0)