Skip to content

[FLINK-37479][postgres] Add support for PARTITIONED TABLE #4004

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

phamvinh1712
Copy link
Contributor

@phamvinh1712 phamvinh1712 commented Apr 26, 2025

Add support for Postgres partitioned table
2 test cases added:

  • Discover partitioned table
  • IT test to verify snapshot and wal events can be consumed for partitioned table

@phamvinh1712
Copy link
Contributor Author

Hello @loserwang1024 , could you help to review when you have time. Thank you

Copy link
Contributor

@loserwang1024 loserwang1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Pr looks good to me in general, just some minor advice.


// generate WAL
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also create a new sub-partition table and insert data here. Newly added partition table will also be read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@loserwang1024 i've updated test for this.

@@ -38,7 +38,8 @@ public static List<TableId> listTables(
throws SQLException {

Set<TableId> allTableIds =
jdbc.readTableNames(database, null, null, new String[] {"TABLE"});
Copy link
Contributor

@loserwang1024 loserwang1024 Apr 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope we can add a param such as partitioned(maybe we can check whether debezium has, so we can reuse), when this param is enabled:

  1. discovery partition table here.
  2. add publish_via_partition_root=true when create publication.We can add a initRootPublication like what io.debezium.connector.postgresql.connection.PostgresReplicationConnection#initPublication does or just modify this method. (But this class is copy from Debezium, too much difference is not recommand).

Though it's enough to read partition table now. But user have to create publication in advance.
To be honest, I hope Debeziumapply it. But if it lacks it, we can do it to make easier for user.

Just my own thought,
@phamvinh1712 @leonardBang , What do you think?

Copy link
Contributor Author

@phamvinh1712 phamvinh1712 Apr 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that debezium doesn't create publication with publish_via_partition_root=true and users will need to create it themselves in advance.
I agree with you that if we do it, it will help users but i'm afraid it will make us hard to upgrade debezium version (which i believe we need to do at some points since flink-cdc is using an old version of debezium 1.9.6)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theres is also another concern. If user use regex expression as table name, both parent table and subtable will be captured. Then snapshot data will be read twice.

Thus, a option can also tell user that. If they enable partition table, they should pay attention to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theres is also another concern. If user use regex expression as table name, both parent table and subtable will be captured. Then snapshot data will be read twice.

Thus, a option can also tell user that. If they enable partition table, they should pay attention to it.

do you have any suggestion for this? should we add a section on flink-cdc doc for this?

@phamvinh1712 phamvinh1712 requested a review from loserwang1024 May 5, 2025 10:14
@loserwang1024
Copy link
Contributor

loserwang1024 commented May 12, 2025

I have dedug this modification, it turns out.

jdbc.readTableNames(database, null, null, new String[] {"TABLE"}) will show sub partition tables:
image

jdbc.readTableNames( database, null, null, new String[] {"PARTITIONED TABLE"}) will show parents partition table:
image

Thus, if set table-name = 'inventory_partitioned.*', both leaf tables and parents table are captured.
image

Thus, even though we can not publish_via_partition_root in this pr, a option to enable partition table is needed:

 Set<TableId> allTableIds ;
        if(!partition) {
            jdbc.readTableNames(
                    database, null, null, new String[]{ "TABLE"});
        }else{
            jdbc.readTableNames(
                    database, null, null, new String[]{ "PARTITIONED TABLE"});
        }

Otherwise, this pr will influence currently regex logic(data duplicate)

@github-actions github-actions bot added the docs Improvements or additions to documentation label May 26, 2025
@phamvinh1712
Copy link
Contributor Author

hi @loserwang1024 , I've updated the PR to add this feature via a boolean flag (default to false) so it won't affect existing users. I also added a note on the docs that users who opt-in to use this feature should create PUBLICATION beforehand and avoid listing parent/child tables twice. Please let me know what you think.

@lvyanquan lvyanquan added the 3.5 label May 27, 2025
@loserwang1024
Copy link
Contributor

I agree with you in general. Maybe only a minor discuss on option name. What about scan.publish-via-partition-root.enabled? @leonardBang @ruanhang1993 , WDYT?

      <td>include-partitioned-table</td>
      <td>optional</td>
      <td style="word-wrap: break-word;">false</td>
      <td>Boolean</td>
      <td>
        Whether to enable reading partitioned table.<br>
        If enabled:
          (1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true
          (2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice.
      </td>
    </tr>

@phamvinh1712
Copy link
Contributor Author

hi @loserwang1024 , i've updated the option name based on your suggestion

Copy link
Contributor

@loserwang1024 loserwang1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!
@leonardBang @ruanhang1993 , is there anyone else have some advice on the config name?

@ruanhang1993
Copy link
Contributor

I agree with you in general. Maybe only a minor discuss on option name. What about scan.publish-via-partition-root.enabled? @leonardBang @ruanhang1993 , WDYT?

      <td>include-partitioned-table</td>
      <td>optional</td>
      <td style="word-wrap: break-word;">false</td>
      <td>Boolean</td>
      <td>
        Whether to enable reading partitioned table.<br>
        If enabled:
          (1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true
          (2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice.
      </td>
    </tr>

Hi, @phamvinh1712 @loserwang1024 .Sorry for my late response.

IMO, I think this new option name scan.publish-via-partition-root.enabled is difficult to understand for users who are not familiar with postgres. Actually I am one of them. ;)

The first time I saw this option name, I was confused and do not know when I should open this option.
I think the option name could be like scan.include-partitioned-tables.enabled. WDYT?

@leonardBang
Copy link
Contributor

scan.include-partitioned-tables.enabled.

+1 for this option name, I think it's better to not expose to many underlying concept to users, especially many Flink CDC users are not familiar with PG internal.

@loserwang1024
Copy link
Contributor

I agree with you guys. It seems that I was considered from a postgres connector developer rather than a norman user.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
3.5 docs Improvements or additions to documentation postgres-cdc-connector reviewed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants