|
16 | 16 | //! The module handles both V1 and V2 manifest formats transparently.
|
17 | 17 |
|
18 | 18 | use std::{
|
| 19 | + collections::HashSet, |
19 | 20 | io::Read,
|
20 | 21 | iter::{repeat, Map, Repeat, Zip},
|
21 | 22 | sync::Arc,
|
@@ -279,14 +280,12 @@ impl<'schema, 'metadata> ManifestWriter<'schema, 'metadata> {
|
279 | 280 | /// * Required metadata fields cannot be serialized
|
280 | 281 | /// * The partition spec ID is not found in table metadata
|
281 | 282 | pub(crate) fn from_existing(
|
282 |
| - bytes: &[u8], |
| 283 | + manifest_reader: impl Iterator<Item = Result<ManifestEntry, Error>>, |
283 | 284 | mut manifest: ManifestListEntry,
|
284 | 285 | schema: &'schema AvroSchema,
|
285 | 286 | table_metadata: &'metadata TableMetadata,
|
286 | 287 | branch: Option<&str>,
|
287 | 288 | ) -> Result<Self, Error> {
|
288 |
| - let manifest_reader = ManifestReader::new(bytes)?; |
289 |
| - |
290 | 289 | let mut writer = AvroWriter::new(schema, Vec::new());
|
291 | 290 |
|
292 | 291 | writer.add_user_metadata(
|
@@ -366,6 +365,131 @@ impl<'schema, 'metadata> ManifestWriter<'schema, 'metadata> {
|
366 | 365 | })
|
367 | 366 | }
|
368 | 367 |
|
| 368 | + /// Creates a ManifestWriter from an existing manifest file with selective filtering of entries. |
| 369 | + /// |
| 370 | + /// This method reads an existing manifest file and creates a new writer that includes |
| 371 | + /// only the entries whose file paths are NOT in the provided filter set. Entries that |
| 372 | + /// pass the filter have their status updated to "Existing" and their sequence numbers |
| 373 | + /// and snapshot IDs updated as needed. |
| 374 | + /// |
| 375 | + /// This is particularly useful for overwrite operations where specific files need to be |
| 376 | + /// excluded from the new manifest while preserving other existing entries. |
| 377 | + /// |
| 378 | + /// # Arguments |
| 379 | + /// * `bytes` - The raw bytes of the existing manifest file |
| 380 | + /// * `manifest` - The manifest list entry describing the existing manifest |
| 381 | + /// * `filter` - A set of file paths to exclude from the new manifest |
| 382 | + /// * `schema` - The Avro schema used for serializing manifest entries |
| 383 | + /// * `table_metadata` - The table metadata containing schema and partition information |
| 384 | + /// * `branch` - Optional branch name to get the current schema from |
| 385 | + /// |
| 386 | + /// # Returns |
| 387 | + /// * `Result<Self, Error>` - A new ManifestWriter instance or an error if initialization fails |
| 388 | + /// |
| 389 | + /// # Errors |
| 390 | + /// Returns an error if: |
| 391 | + /// * The existing manifest cannot be read |
| 392 | + /// * The Avro writer cannot be created |
| 393 | + /// * Required metadata fields cannot be serialized |
| 394 | + /// * The partition spec ID is not found in table metadata |
| 395 | + /// |
| 396 | + /// # Behavior |
| 397 | + /// - Entries whose file paths are in the `filter` set are excluded from the new manifest |
| 398 | + /// - Remaining entries have their status set to `Status::Existing` |
| 399 | + /// - Sequence numbers are updated for entries that don't have them |
| 400 | + /// - Snapshot IDs are updated for entries that don't have them |
| 401 | + /// - The manifest's sequence number is incremented |
| 402 | + /// - File counts are updated to reflect the filtered entries |
| 403 | + pub(crate) fn from_existing_with_filter( |
| 404 | + bytes: &[u8], |
| 405 | + mut manifest: ManifestListEntry, |
| 406 | + filter: &HashSet<String>, |
| 407 | + schema: &'schema AvroSchema, |
| 408 | + table_metadata: &'metadata TableMetadata, |
| 409 | + branch: Option<&str>, |
| 410 | + ) -> Result<Self, Error> { |
| 411 | + let manifest_reader = ManifestReader::new(bytes)?; |
| 412 | + |
| 413 | + let mut writer = AvroWriter::new(schema, Vec::new()); |
| 414 | + |
| 415 | + writer.add_user_metadata( |
| 416 | + "format-version".to_string(), |
| 417 | + match table_metadata.format_version { |
| 418 | + FormatVersion::V1 => "1".as_bytes(), |
| 419 | + FormatVersion::V2 => "2".as_bytes(), |
| 420 | + }, |
| 421 | + )?; |
| 422 | + |
| 423 | + writer.add_user_metadata( |
| 424 | + "schema".to_string(), |
| 425 | + match table_metadata.format_version { |
| 426 | + FormatVersion::V1 => serde_json::to_string(&Into::<SchemaV1>::into( |
| 427 | + table_metadata.current_schema(branch)?.clone(), |
| 428 | + ))?, |
| 429 | + FormatVersion::V2 => serde_json::to_string(&Into::<SchemaV2>::into( |
| 430 | + table_metadata.current_schema(branch)?.clone(), |
| 431 | + ))?, |
| 432 | + }, |
| 433 | + )?; |
| 434 | + |
| 435 | + writer.add_user_metadata( |
| 436 | + "schema-id".to_string(), |
| 437 | + serde_json::to_string(&table_metadata.current_schema(branch)?.schema_id())?, |
| 438 | + )?; |
| 439 | + |
| 440 | + let spec_id = table_metadata.default_spec_id; |
| 441 | + |
| 442 | + writer.add_user_metadata( |
| 443 | + "partition-spec".to_string(), |
| 444 | + serde_json::to_string( |
| 445 | + &table_metadata |
| 446 | + .partition_specs |
| 447 | + .get(&spec_id) |
| 448 | + .ok_or(Error::NotFound(format!("Partition spec with id {spec_id}")))? |
| 449 | + .fields(), |
| 450 | + )?, |
| 451 | + )?; |
| 452 | + |
| 453 | + writer.add_user_metadata( |
| 454 | + "partition-spec-id".to_string(), |
| 455 | + serde_json::to_string(&spec_id)?, |
| 456 | + )?; |
| 457 | + |
| 458 | + writer.add_user_metadata("content".to_string(), "data")?; |
| 459 | + |
| 460 | + writer.extend(manifest_reader.filter_map(|entry| { |
| 461 | + let mut entry = entry |
| 462 | + .map_err(|err| apache_avro::Error::DeserializeValue(err.to_string())) |
| 463 | + .unwrap(); |
| 464 | + if !filter.contains(entry.data_file().file_path()) { |
| 465 | + *entry.status_mut() = Status::Existing; |
| 466 | + if entry.sequence_number().is_none() { |
| 467 | + *entry.sequence_number_mut() = Some(manifest.sequence_number); |
| 468 | + } |
| 469 | + if entry.snapshot_id().is_none() { |
| 470 | + *entry.snapshot_id_mut() = Some(manifest.added_snapshot_id); |
| 471 | + } |
| 472 | + Some(to_value(entry).unwrap()) |
| 473 | + } else { |
| 474 | + None |
| 475 | + } |
| 476 | + }))?; |
| 477 | + |
| 478 | + manifest.sequence_number = table_metadata.last_sequence_number + 1; |
| 479 | + |
| 480 | + manifest.existing_files_count = Some( |
| 481 | + manifest.existing_files_count.unwrap_or(0) + manifest.added_files_count.unwrap_or(0), |
| 482 | + ); |
| 483 | + |
| 484 | + manifest.added_files_count = None; |
| 485 | + |
| 486 | + Ok(ManifestWriter { |
| 487 | + manifest, |
| 488 | + writer, |
| 489 | + table_metadata, |
| 490 | + }) |
| 491 | + } |
| 492 | + |
369 | 493 | /// Appends a manifest entry to the manifest file and updates summary statistics.
|
370 | 494 | ///
|
371 | 495 | /// This method adds a new manifest entry while maintaining:
|
|
0 commit comments