-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[core]Format table #6019
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[core]Format table #6019
Conversation
* github/master: (41 commits) [Python] Support data writer for PyPaimon (apache#5997) [Python] Support scan and plan for PyPaimon (apache#5996) [flink-cdc] Provide option to disable use of source primary keys if primary keys in action command are not specified for CDC ingestion. (apache#5793) Revert "[core] Add compaction.force-wait to support force waiting compaction finish when preparing commit (apache#5994)" (apache#5995) [core] Add total compaction count metric (apache#5963) [hotfix] Rename to SchemaManager.applyRenameColumnsToOptions [core] fix column rename when columns referenced by table options. (apache#5964) [core] Log a warning for invalid partition values instead of throwing an exception when enable partition mark done. (apache#5978) [core] Add required Field IDs to support ID-based column pruning (apache#5981) [core] Row-tracking row should keep their row_id and sequence_number in compaction (apache#5991) [core] Add compaction.force-wait to support force waiting compaction finish when preparing commit (apache#5994) [format] Introduce 'write.batch-memory' to control memory in arrow (apache#5988) [flink] Change filesystem.job-level-settings.enabled default value to true (apache#5971) [clone] support including some tables when clone all tables in a catalog or database. (apache#5993) [iceberg] Support TINYINT and SMALLINT in Iceberg Compatibility (apache#5984) [Python] Support snapshot and manifest for PyPaimon (apache#5987) [python] Change Schema to TableSchema in Class GetTableResponse. (apache#5990) [core] Introduce 'compaction.total-size-threshold' to do full compaction (apache#5973) [Python] Support filesystem catalog for PyPaimon (apache#5986) [core] Add lance table type for rest catalog (apache#5977) ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces new file format support for Paimon tables by implementing JSON and CSV format capabilities. The changes enable tables to read and write data in JSON and CSV formats alongside existing formats like Parquet and ORC.
- Adds comprehensive JSON format support with configurable timestamp formats, null key handling, and parse error options
- Implements CSV format support with configurable delimiters, quote handling, and header options
- Integrates format table functionality with proper read/write builders and file handling
Reviewed Changes
Copilot reviewed 36 out of 36 changed files in this pull request and generated 7 comments.
Show a summary per file
File | Description |
---|---|
paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory | Registers new CSV and JSON format factories |
paimon-format/src/main/java/org/apache/paimon/format/json/* | Complete JSON format implementation with readers, writers, and converters |
paimon-format/src/main/java/org/apache/paimon/format/csv/* | Complete CSV format implementation with readers, writers, and file handling |
paimon-core/src/main/java/org/apache/paimon/table/format/* | Format table infrastructure for read/write operations |
paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java | Enhanced format table interface with batch write and read builders |
paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
Outdated
Show resolved
Hide resolved
// If the underlying stream is already closed, ignore the exception | ||
if (!e.getMessage().contains("Already closed")) { | ||
throw e; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String-based exception matching is fragile and error-prone. Consider checking the exception type or using a more robust method to detect already-closed streams.
// If the underlying stream is already closed, ignore the exception | |
if (!e.getMessage().contains("Already closed")) { | |
throw e; | |
} | |
// Ignore exceptions during close, as CloseShieldOutputStream prevents closing the underlying stream. | |
// If you want to log the exception, you can do so here. | |
// e.g., log.warn("Exception during writer close", e); |
Copilot uses AI. Check for mistakes.
paimon-format/src/main/java/org/apache/paimon/format/json/CloseShieldOutputStream.java
Outdated
Show resolved
Hide resolved
if (writer != null) { | ||
try { | ||
writer.close(); | ||
} catch (IOException e) { | ||
// If the underlying stream is already closed, ignore the exception | ||
if (!e.getMessage().contains("Already closed")) { | ||
throw e; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String-based exception matching is fragile and error-prone. Consider checking the exception type or using a more robust method to detect already-closed streams.
if (writer != null) { | |
try { | |
writer.close(); | |
} catch (IOException e) { | |
// If the underlying stream is already closed, ignore the exception | |
if (!e.getMessage().contains("Already closed")) { | |
throw e; | |
} | |
} | |
} | |
if (closed) { | |
return; | |
} | |
if (writer != null) { | |
writer.close(); | |
} | |
closed = true; |
Copilot uses AI. Check for mistakes.
paimon-format/src/main/java/org/apache/paimon/format/csv/CloseShieldOutputStream.java
Outdated
Show resolved
Hide resolved
if (e.getMessage() != null && e.getMessage().contains("Already closed")) { | ||
// Silently ignore already closed exception | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String-based exception matching is fragile and error-prone. Consider checking the exception type or using a more robust method to detect already-closed streams.
if (e.getMessage() != null && e.getMessage().contains("Already closed")) { | |
// Silently ignore already closed exception | |
return; | |
} | |
// String-based exception matching is fragile and error-prone. | |
// Consider catching a specific exception type if available. |
Copilot uses AI. Check for mistakes.
* github/master: (26 commits) [hotfix] Fix useless cases in StartupModeTest (apache#6029) [docs] Introduce row-tracking to append table (apache#6023) [cdc] Support computed column referring to each other while sync_table (apache#5972) [build] fix maven resolve dependencies error on github workflow. (apache#5968) [core] Introduce PrimaryKeyTableUtils.validatePKUpsertDeletable to reuse codes [spark] fix audit_log streaming read for rowkind delete (apache#6021) [variant][spark] Add support for read and write shredding (apache#5905) [core][spark] Support UPDATE for row lineage (apache#6014) [iceberg] Introduce Iceberg Table in Catalog (apache#6020) [test] fix FlinkRestCatalogITCase ut failed in idea (apache#6017) [common] Fix small of FallbackMappingRow null check, fix assign row id issue. (apache#6018) [flink][util] MultipleParameterToolAdapter supports merging other parameters (apache#6016) [core] Introduce expireForEmptyCommit to InnerTableCommit (apache#6013) [core] Add range check counter in NextSnapshotFetcher (apache#6009) [iceberg] add a test case and make minor documentation fixes (apache#6011) [python] Fix schema api bug (apache#6010) [pvfs] Support define endpoint in path (apache#6008) [flink] Predicate add is true and is false (apache#6005) [Core] Custom commit callback support to be initialized with table (apache#6004) [python] Support mutual conversion between paimon DataField and pyarrow Field (apache#6000) ...
Purpose
Linked issue: close #xxx
Tests
API and Format
Documentation