diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java index 1e5c10395b9..ecae14b90cc 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java @@ -84,14 +84,45 @@ public static Object[] queryMinMax( }); } + private static String resolveVersion(JdbcConnection jdbc) { + try (Statement statement = jdbc.connection().createStatement(); + ResultSet resultSet = statement.executeQuery("SELECT version()")) { + resultSet.next(); + return resultSet.getString(1); + } catch (Exception e) { + log.info( + "Failed to get PostgreSQL version, fallback to default version: {}", + e.getMessage(), + e); + return ""; + } + } + public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException { - // The statement used to get approximate row count which is less - // accurate than COUNT(*), but is more efficient for large table. - final String rowCountQuery = - String.format( - "SELECT reltuples FROM pg_class r WHERE relkind = 'r' AND relname = '%s';", - tableId.table()); + String version = resolveVersion(jdbc); + log.info("Detected PostgreSQL version: {}", version); + + // Determine if the version is greater than or equal to 13 + boolean isVersion13OrAbove = isPostgresVersion13OrAbove(version); + + final String rowCountQuery; + if (isVersion13OrAbove) { + // Query if the version is greater than or equal to 13 + rowCountQuery = + String.format( + "SELECT reltuples FROM pg_class r WHERE (relkind = 'r' OR relkind = 'p') AND relname = '%s';", + tableId.table()); + } else { + // Provide an alternative if the version requirement is not met (i.e., if the version is + // less than 13) + rowCountQuery = + String.format( + "SELECT reltuples FROM pg_class r WHERE relkind = 'r' AND relname = '%s';", + tableId.table()); + } + + log.info("Executing query: {}", rowCountQuery); return jdbc.queryAndMap( rowCountQuery, rs -> { @@ -105,6 +136,27 @@ public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) }); } + private static boolean isPostgresVersion13OrAbove(String version) { + // Extracting the major version number from a PostgreSQL version string + if (version == null || version.isEmpty()) { + log.warn("PostgreSQL version is empty or null. Assuming version < 13."); + return false; + } + try { + // "PostgreSQL 13.3 (Ubuntu 13.3-1.pgdg20.04+1)" "13" + String[] parts = version.split(" "); + for (String part : parts) { + if (part.matches("\\d+(\\.\\d+)?")) { // Matching numeric versions + int majorVersion = Integer.parseInt(part.split("\\.")[0]); + return majorVersion >= 13; + } + } + } catch (Exception e) { + log.warn("Failed to parse PostgreSQL version: {}. Assuming version < 13.", version, e); + } + return false; + } + public static Object queryMin( JdbcConnection jdbc, TableId tableId, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java index d171d104056..b654d16e5bb 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java @@ -17,8 +17,6 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.postgres; -import org.apache.seatunnel.shade.com.google.common.collect.Lists; - import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfigFactory; import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfigFactory; @@ -45,6 +43,7 @@ import org.testcontainers.lifecycle.Startables; import org.testcontainers.utility.DockerImageName; +import com.beust.jcommander.internal.Lists; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.TableId; import lombok.extern.slf4j.Slf4j; @@ -93,9 +92,12 @@ public class PostgresCDCIT extends TestSuiteBase implements TestResource { private static final String SOURCE_TABLE_1 = "postgres_cdc_table_1"; private static final String SOURCE_TABLE_2 = "postgres_cdc_table_2"; private static final String SOURCE_TABLE_3 = "postgres_cdc_table_3"; + private static final String SOURCE_PARTITIONED_TABLE = "source_partitioned_table"; + private static final String SOURCE_PARTITIONED_TABLE_2023 = "source_partitioned_table_2023"; private static final String SINK_TABLE_1 = "sink_postgres_cdc_table_1"; private static final String SINK_TABLE_2 = "sink_postgres_cdc_table_2"; private static final String SINK_TABLE_3 = "sink_postgres_cdc_table_3"; + private static final String SINK_PARTITIONED_TABLE = "sink_partitioned_table"; private static final String SOURCE_TABLE_NO_PRIMARY_KEY = "full_types_no_primary_key"; @@ -142,6 +144,7 @@ private String driverUrl() { @Override public void startUp() { log.info("The second stage: Starting Postgres containers..."); + POSTGRES_CONTAINER.setPortBindings( Lists.newArrayList( String.format( @@ -193,6 +196,159 @@ public void testMPostgresCdcCheckDataE2e(TestContainer container) { } } + private static String resolveVersion(Connection jdbc) { + try (Statement statement = jdbc.createStatement(); + ResultSet resultSet = statement.executeQuery("SELECT version()")) { + resultSet.next(); + return resultSet.getString(1); + } catch (Exception e) { + log.info( + "Failed to get PostgreSQL version, fallback to default version: {}", + e.getMessage(), + e); + return ""; + } + } + + private static boolean isPostgresVersion13OrAbove(String version) { + + if (version == null || version.isEmpty()) { + log.warn("PostgreSQL version is empty or null. Assuming version < 13."); + return false; + } + try { + String[] parts = version.split(" "); + for (String part : parts) { + if (part.matches("\\d+(\\.\\d+)?")) { + int majorVersion = Integer.parseInt(part.split("\\.")[0]); + return majorVersion >= 13; + } + } + } catch (Exception e) { + log.warn("Failed to parse PostgreSQL version: {}. Assuming version < 13.", version, e); + } + return false; + } + + @TestTemplate + public void testPostgresPartitionedTableE2e(TestContainer container) { + try (Connection connection = getJdbcConnection(); ) { + String version = resolveVersion(connection); + log.info("Detected PostgreSQL version: {}", version); + boolean isVersion13OrAbove = isPostgresVersion13OrAbove(version); + if (isVersion13OrAbove) { + Partition13(container); + } else { + Partition12(container); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void Partition13(TestContainer container) { + try { + CompletableFuture.supplyAsync( + () -> { + try { + container.executeJob("/postgrescdc_to_postgres_with_partition.conf"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + query( + getQuerySQL( + POSTGRESQL_SCHEMA, + SOURCE_PARTITIONED_TABLE)), + query( + getQuerySQL( + POSTGRESQL_SCHEMA, + SINK_PARTITIONED_TABLE))); + }); + + // insert update delete + upsertDeleteSourcePartionTable(POSTGRESQL_SCHEMA, SOURCE_PARTITIONED_TABLE); + + // stream stage + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + query( + getQuerySQL( + POSTGRESQL_SCHEMA, + SOURCE_PARTITIONED_TABLE)), + query( + getQuerySQL( + POSTGRESQL_SCHEMA, + SINK_PARTITIONED_TABLE))); + }); + } finally { + // Clear related content to ensure that multiple operations are not affected + clearTable(POSTGRESQL_SCHEMA, SOURCE_PARTITIONED_TABLE); + clearTable(POSTGRESQL_SCHEMA, SINK_PARTITIONED_TABLE); + } + } + + public void Partition12(TestContainer container) { + try { + CompletableFuture.supplyAsync( + () -> { + try { + container.executeJob("/postgrescdc_to_postgres_with_partition12.conf"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + query( + getQuerySQL( + POSTGRESQL_SCHEMA, + SOURCE_PARTITIONED_TABLE_2023)), + query( + getQuerySQL( + POSTGRESQL_SCHEMA, + SINK_PARTITIONED_TABLE))); + }); + + // insert update delete + upsertDeleteSourcePartionTable(POSTGRESQL_SCHEMA, SOURCE_PARTITIONED_TABLE_2023); + + // stream stage + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + query( + getQuerySQL( + POSTGRESQL_SCHEMA, + SOURCE_PARTITIONED_TABLE_2023)), + query( + getQuerySQL( + POSTGRESQL_SCHEMA, + SINK_PARTITIONED_TABLE))); + }); + + } finally { + // Clear related content to ensure that multiple operations are not affected + clearTable(POSTGRESQL_SCHEMA, SOURCE_PARTITIONED_TABLE_2023); + clearTable(POSTGRESQL_SCHEMA, SINK_PARTITIONED_TABLE); + } + } + @TestTemplate @DisabledOnContainer( value = {}, @@ -739,6 +895,31 @@ private void upsertDeleteSourceTable(String database, String tableName) { executeSql("UPDATE " + database + "." + tableName + " SET f_big = 10000 where id = 3;"); } + private void upsertDeleteSourcePartionTable(String database, String tableName) { + executeSql( + "INSERT INTO " + + database + + "." + + tableName + + " VALUES (2, 'Sample Data 1', '2023-06-15 10:30:00');"); + + executeSql( + "INSERT INTO " + + database + + "." + + tableName + + " VALUES (3, 'Sample Data 2', '2023-07-20 15:45:00');"); + + executeSql("DELETE FROM " + database + "." + tableName + " WHERE id = 2;"); + + executeSql( + "UPDATE " + + database + + "." + + tableName + + " SET data = 'Updated Data' WHERE id = 3;"); + } + private String getQuerySQL(String database, String tableName) { return String.format(SOURCE_SQL_TEMPLATE, database, tableName); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql index 1372f98a444..fe163405d5f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql @@ -168,6 +168,55 @@ CREATE TABLE sink_postgres_cdc_table_3 PRIMARY KEY (id) ); + + +CREATE TABLE source_partitioned_table +( + id INTEGER NOT NULL, + data TEXT, + event_time TIMESTAMP NOT NULL, + PRIMARY KEY (id, event_time) +) PARTITION BY RANGE (event_time); + + +CREATE TABLE source_partitioned_table_2023 + PARTITION OF source_partitioned_table + FOR VALUES FROM ('2023-01-01 00:00:00') TO ('2024-01-01 00:00:00'); + +CREATE TABLE source_partitioned_table_2024 + PARTITION OF source_partitioned_table + FOR VALUES FROM ('2024-01-01 00:00:00') TO ('2025-01-01 00:00:00'); + + +CREATE TABLE sink_partitioned_table +( + id INTEGER NOT NULL, + data TEXT, + event_time TIMESTAMP NOT NULL, + PRIMARY KEY (id, event_time) +) PARTITION BY RANGE (event_time); + + +CREATE TABLE sink_partitioned_table_2023 + PARTITION OF sink_partitioned_table + FOR VALUES FROM ('2023-01-01 00:00:00') TO ('2024-01-01 00:00:00'); + +CREATE TABLE sink_partitioned_table_2024 + PARTITION OF sink_partitioned_table + FOR VALUES FROM ('2024-01-01 00:00:00') TO ('2025-01-01 00:00:00'); + +ALTER TABLE source_partitioned_table_2023 REPLICA IDENTITY FULL; +ALTER TABLE source_partitioned_table_2024 REPLICA IDENTITY FULL; +ALTER TABLE source_partitioned_table REPLICA IDENTITY FULL; +ALTER TABLE sink_partitioned_table REPLICA IDENTITY FULL; +ALTER TABLE sink_partitioned_table_2023 REPLICA IDENTITY FULL; +ALTER TABLE sink_partitioned_table_2023 REPLICA IDENTITY FULL; + +INSERT INTO source_partitioned_table (id, data, event_time) +VALUES (1, 'Partitioned data 2023', '2023-06-01 12:00:00'), + (2, 'Partitioned data 2024', '2024-03-15 09:30:00'); + + ALTER TABLE postgres_cdc_table_1 REPLICA IDENTITY FULL; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_partition.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_partition.conf new file mode 100644 index 00000000000..37ef1bb8903 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_partition.conf @@ -0,0 +1,59 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # Engine configuration + execution.parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 + read_limit.bytes_per_second = 7000000 + read_limit.rows_per_second = 400 +} + +source { + Postgres-CDC { + plugin_output = "partitioned_table_source" + username = "postgres" + password = "postgres" + database-names = ["postgres_cdc"] + schema-names = ["inventory"] + table-names = ["postgres_cdc.inventory.source_partitioned_table"] + base-url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF" + + } +} + +transform { + +} + +sink { + jdbc { + plugin_input = "partitioned_table_source" + url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF" + driver = "org.postgresql.Driver" + user = "postgres" + password = "postgres" + generate_sink_sql = true + database = postgres_cdc + table = inventory.sink_partitioned_table + + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_partition12.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_partition12.conf new file mode 100644 index 00000000000..b6dfbc158cc --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_partition12.conf @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # Engine configuration + execution.parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 + read_limit.bytes_per_second = 7000000 + read_limit.rows_per_second = 400 +} + +source { + Postgres-CDC { + plugin_output = "partitioned_table_source" + username = "postgres" + password = "postgres" + database-names = ["postgres_cdc"] + schema-names = ["inventory"] + table-names = ["postgres_cdc.inventory.source_partitioned_table_2023"] + base-url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF" + + } +} + +transform { + +} + +sink { + jdbc { + plugin_input = "partitioned_table_source" + url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF" + driver = "org.postgresql.Driver" + user = "postgres" + password = "postgres" + generate_sink_sql = true + database = postgres_cdc + table = inventory.sink_partitioned_table + schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" + data_save_mode="APPEND_DATA" + } +} \ No newline at end of file