From 478e6a3e1802d751c745925ca1814e8b891becbb Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Sun, 29 Dec 2024 13:03:49 +0800 Subject: [PATCH 01/24] [connector-cdc-postgres] support read partition table --- .../connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java index 1e5c10395b9..d79fe446857 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java @@ -90,7 +90,7 @@ public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) // accurate than COUNT(*), but is more efficient for large table. final String rowCountQuery = String.format( - "SELECT reltuples FROM pg_class r WHERE relkind = 'r' AND relname = '%s';", + "SELECT reltuples FROM pg_class r WHERE (relkind = 'r' or relkind = 'p') AND relname = '%s';", tableId.table()); return jdbc.queryAndMap( rowCountQuery, From 9cff46a1c45f9062a3b13ae9e4b12fa9ebf89b35 Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Sun, 29 Dec 2024 17:24:50 +0800 Subject: [PATCH 02/24] [connector-cdc-postgres] support read partition table --- .../seatunnel/cdc/postgres/PostgresCDCIT.java | 41 +++++++++++++ .../src/test/resources/ddl/inventory.sql | 47 +++++++++++++++ ...ostgrescdc_to_postgres_with_partition.conf | 59 +++++++++++++++++++ 3 files changed, 147 insertions(+) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_partition.conf diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java index d171d104056..7c1a92e2981 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java @@ -93,9 +93,11 @@ public class PostgresCDCIT extends TestSuiteBase implements TestResource { private static final String SOURCE_TABLE_1 = "postgres_cdc_table_1"; private static final String SOURCE_TABLE_2 = "postgres_cdc_table_2"; private static final String SOURCE_TABLE_3 = "postgres_cdc_table_3"; + private static final String SOURCE_PARTITIONED_TABLE = "source_partitioned_table"; private static final String SINK_TABLE_1 = "sink_postgres_cdc_table_1"; private static final String SINK_TABLE_2 = "sink_postgres_cdc_table_2"; private static final String SINK_TABLE_3 = "sink_postgres_cdc_table_3"; + private static final String SINK_PARTITIONED_TABLE = "sink_partitioned_table"; private static final String SOURCE_TABLE_NO_PRIMARY_KEY = "full_types_no_primary_key"; @@ -193,6 +195,45 @@ public void testMPostgresCdcCheckDataE2e(TestContainer container) { } } + @TestTemplate + public void testPostgresPartitionedTableE2e(TestContainer container) { + + try { + CompletableFuture.supplyAsync( + () -> { + try { + container.executeJob("/postgrescdc_to_postgres_with_partition.conf"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + query(getQuerySQL(POSTGRESQL_SCHEMA, SOURCE_PARTITIONED_TABLE)), + query(getQuerySQL(POSTGRESQL_SCHEMA, SINK_PARTITIONED_TABLE))); + }); + + // insert update delete + upsertDeleteSourceTable(POSTGRESQL_SCHEMA, SOURCE_PARTITIONED_TABLE); + + // stream stage + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + query(getQuerySQL(POSTGRESQL_SCHEMA, SOURCE_PARTITIONED_TABLE)), + query(getQuerySQL(POSTGRESQL_SCHEMA, SINK_PARTITIONED_TABLE))); + }); + } finally { + // Clear related content to ensure that multiple operations are not affected + clearTable(POSTGRESQL_SCHEMA, SOURCE_PARTITIONED_TABLE); + clearTable(POSTGRESQL_SCHEMA, SINK_PARTITIONED_TABLE); + } + } @TestTemplate @DisabledOnContainer( value = {}, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql index 1372f98a444..161493a73e9 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql @@ -168,6 +168,53 @@ CREATE TABLE sink_postgres_cdc_table_3 PRIMARY KEY (id) ); + +-- 创建源分区表 +CREATE TABLE source_partitioned_table +( + id INTEGER NOT NULL, + data TEXT, + event_time TIMESTAMP NOT NULL, + PRIMARY KEY (id, event_time) +) PARTITION BY RANGE (event_time); + +-- 创建源分区 +CREATE TABLE source_partitioned_table_2023 + PARTITION OF source_partitioned_table + FOR VALUES FROM ('2023-01-01 00:00:00') TO ('2024-01-01 00:00:00'); + +CREATE TABLE source_partitioned_table_2024 + PARTITION OF source_partitioned_table + FOR VALUES FROM ('2024-01-01 00:00:00') TO ('2025-01-01 00:00:00'); + +-- 创建目标分区表 +CREATE TABLE sink_partitioned_table +( + id INTEGER NOT NULL, + data TEXT, + event_time TIMESTAMP NOT NULL, + PRIMARY KEY (id, event_time) +) PARTITION BY RANGE (event_time); + +-- 创建目标分区 +CREATE TABLE sink_partitioned_table_2023 + PARTITION OF sink_partitioned_table + FOR VALUES FROM ('2023-01-01 00:00:00') TO ('2024-01-01 00:00:00'); + +CREATE TABLE sink_partitioned_table_2024 + PARTITION OF sink_partitioned_table + FOR VALUES FROM ('2024-01-01 00:00:00') TO ('2025-01-01 00:00:00'); + +-- 设置复制标识,支持 PostgreSQL CDC +ALTER TABLE source_partitioned_table REPLICA IDENTITY FULL; +ALTER TABLE sink_partitioned_table REPLICA IDENTITY FULL; + +-- 插入测试数据到源分区表 +INSERT INTO source_partitioned_table (id, data, event_time) +VALUES (1, 'Partitioned data 2023', '2023-06-01 12:00:00'), + (2, 'Partitioned data 2024', '2024-03-15 09:30:00'); + + ALTER TABLE postgres_cdc_table_1 REPLICA IDENTITY FULL; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_partition.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_partition.conf new file mode 100644 index 00000000000..cefe1f8feee --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_partition.conf @@ -0,0 +1,59 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # Engine configuration + execution.parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 + read_limit.bytes_per_second = 7000000 + read_limit.rows_per_second = 400 +} + +source { + Postgres-CDC { + result_table_name = "partitioned_table_source" + username = "postgres" + password = "postgres" + database-names = ["postgres_cdc"] + schema-names = ["inventory"] + table-names = ["postgres_cdc.inventory.source_partitioned_table"] + base-url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF" + + } +} + +transform { + +} + +sink { + jdbc { + source_table_name = "partitioned_table_source" + url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF" + driver = "org.postgresql.Driver" + user = "postgres" + password = "postgres" + generate_sink_sql = true + database = postgres_cdc + table = inventory.sink_partitioned_table + + } +} \ No newline at end of file From 357c77af3eaeea22f22ff9305291d42e3128b521 Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Mon, 30 Dec 2024 09:38:54 +0800 Subject: [PATCH 03/24] [connector-cdc-postgres] support read partition table --- .../seatunnel/cdc/postgres/PostgresCDCIT.java | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java index 7c1a92e2981..36bdc49e453 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java @@ -213,8 +213,14 @@ public void testPostgresPartitionedTableE2e(TestContainer container) { .untilAsserted( () -> { Assertions.assertIterableEquals( - query(getQuerySQL(POSTGRESQL_SCHEMA, SOURCE_PARTITIONED_TABLE)), - query(getQuerySQL(POSTGRESQL_SCHEMA, SINK_PARTITIONED_TABLE))); + query( + getQuerySQL( + POSTGRESQL_SCHEMA, + SOURCE_PARTITIONED_TABLE)), + query( + getQuerySQL( + POSTGRESQL_SCHEMA, + SINK_PARTITIONED_TABLE))); }); // insert update delete @@ -225,8 +231,14 @@ public void testPostgresPartitionedTableE2e(TestContainer container) { .untilAsserted( () -> { Assertions.assertIterableEquals( - query(getQuerySQL(POSTGRESQL_SCHEMA, SOURCE_PARTITIONED_TABLE)), - query(getQuerySQL(POSTGRESQL_SCHEMA, SINK_PARTITIONED_TABLE))); + query( + getQuerySQL( + POSTGRESQL_SCHEMA, + SOURCE_PARTITIONED_TABLE)), + query( + getQuerySQL( + POSTGRESQL_SCHEMA, + SINK_PARTITIONED_TABLE))); }); } finally { // Clear related content to ensure that multiple operations are not affected @@ -234,6 +246,7 @@ public void testPostgresPartitionedTableE2e(TestContainer container) { clearTable(POSTGRESQL_SCHEMA, SINK_PARTITIONED_TABLE); } } + @TestTemplate @DisabledOnContainer( value = {}, From e3bb350732a2958247f8eb7237d45a680c036291 Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Mon, 30 Dec 2024 10:12:12 +0800 Subject: [PATCH 04/24] [connector-cdc-postgres] support read partition table --- .../resources/postgrescdc_to_postgres_with_partition.conf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_partition.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_partition.conf index cefe1f8feee..37ef1bb8903 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_partition.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_partition.conf @@ -29,7 +29,7 @@ env { source { Postgres-CDC { - result_table_name = "partitioned_table_source" + plugin_output = "partitioned_table_source" username = "postgres" password = "postgres" database-names = ["postgres_cdc"] @@ -46,7 +46,7 @@ transform { sink { jdbc { - source_table_name = "partitioned_table_source" + plugin_input = "partitioned_table_source" url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF" driver = "org.postgresql.Driver" user = "postgres" From 93f23ade05ec86610ef82b83a3c53f03a21f4c72 Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Mon, 30 Dec 2024 14:00:10 +0800 Subject: [PATCH 05/24] [connector-cdc-postgres] support read partition table --- .../seatunnel/cdc/postgres/PostgresCDCIT.java | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java index 36bdc49e453..e3f3ef02f63 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java @@ -224,7 +224,7 @@ public void testPostgresPartitionedTableE2e(TestContainer container) { }); // insert update delete - upsertDeleteSourceTable(POSTGRESQL_SCHEMA, SOURCE_PARTITIONED_TABLE); + upsertDeleteSourcePartionTable(POSTGRESQL_SCHEMA, SOURCE_PARTITIONED_TABLE); // stream stage await().atMost(60000, TimeUnit.MILLISECONDS) @@ -793,6 +793,39 @@ private void upsertDeleteSourceTable(String database, String tableName) { executeSql("UPDATE " + database + "." + tableName + " SET f_big = 10000 where id = 3;"); } + private void upsertDeleteSourcePartionTable(String database, String tableName) { + // 插入新记录到 source_partitioned_table + executeSql( + "INSERT INTO " + + database + + "." + + tableName + + " VALUES (1, 'Sample Data 1', '2023-06-15 10:30:00');"); + + executeSql( + "INSERT INTO " + + database + + "." + + tableName + + " VALUES (2, 'Sample Data 2', '2023-07-20 15:45:00');"); + + // 删除指定记录 + executeSql( + "DELETE FROM " + + database + + "." + + tableName + + " WHERE id = 1 AND event_time = '2023-06-15 10:30:00';"); + + // 更新指定记录 + executeSql( + "UPDATE " + + database + + "." + + tableName + + " SET data = 'Updated Data' WHERE id = 2 AND event_time = '2023-07-20 15:45:00';"); + } + private String getQuerySQL(String database, String tableName) { return String.format(SOURCE_SQL_TEMPLATE, database, tableName); } From 77271ba002aa2871bd45f05b6e86cd454418db21 Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Mon, 30 Dec 2024 14:02:15 +0800 Subject: [PATCH 06/24] [connector-cdc-postgres] support read partition table --- .../connectors/seatunnel/cdc/postgres/PostgresCDCIT.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java index e3f3ef02f63..ab53a11d7a1 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java @@ -800,14 +800,14 @@ private void upsertDeleteSourcePartionTable(String database, String tableName) { + database + "." + tableName - + " VALUES (1, 'Sample Data 1', '2023-06-15 10:30:00');"); + + " VALUES (3, 'Sample Data 1', '2023-06-15 10:30:00');"); executeSql( "INSERT INTO " + database + "." + tableName - + " VALUES (2, 'Sample Data 2', '2023-07-20 15:45:00');"); + + " VALUES (4, 'Sample Data 2', '2023-07-20 15:45:00');"); // 删除指定记录 executeSql( @@ -815,7 +815,7 @@ private void upsertDeleteSourcePartionTable(String database, String tableName) { + database + "." + tableName - + " WHERE id = 1 AND event_time = '2023-06-15 10:30:00';"); + + " WHERE id = 3 AND event_time = '2023-06-15 10:30:00';"); // 更新指定记录 executeSql( @@ -823,7 +823,7 @@ private void upsertDeleteSourcePartionTable(String database, String tableName) { + database + "." + tableName - + " SET data = 'Updated Data' WHERE id = 2 AND event_time = '2023-07-20 15:45:00';"); + + " SET data = 'Updated Data' WHERE id = 4 AND event_time = '2023-07-20 15:45:00';"); } private String getQuerySQL(String database, String tableName) { From d22323d05e098b0b9dd2da8a4af97cab8b6bf535 Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Mon, 30 Dec 2024 14:17:10 +0800 Subject: [PATCH 07/24] [connector-cdc-postgres] support read partition table --- .../connectors/seatunnel/cdc/postgres/PostgresCDCIT.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java index ab53a11d7a1..0c06013576b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java @@ -794,7 +794,6 @@ private void upsertDeleteSourceTable(String database, String tableName) { } private void upsertDeleteSourcePartionTable(String database, String tableName) { - // 插入新记录到 source_partitioned_table executeSql( "INSERT INTO " + database @@ -809,7 +808,6 @@ private void upsertDeleteSourcePartionTable(String database, String tableName) { + tableName + " VALUES (4, 'Sample Data 2', '2023-07-20 15:45:00');"); - // 删除指定记录 executeSql( "DELETE FROM " + database @@ -817,7 +815,6 @@ private void upsertDeleteSourcePartionTable(String database, String tableName) { + tableName + " WHERE id = 3 AND event_time = '2023-06-15 10:30:00';"); - // 更新指定记录 executeSql( "UPDATE " + database From 39b4f9182f9a72e0df690c9e1f36790398cdd8ae Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Mon, 30 Dec 2024 22:07:18 +0800 Subject: [PATCH 08/24] [connector-cdc-postgres] support read partition table --- .../seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java index b56930303d7..adcafabe1de 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java @@ -251,7 +251,7 @@ public Long approximateRowCntStatement(Connection connection, JdbcSourceTable ta if (useTableStats) { String rowCountQuery = String.format( - "SELECT reltuples FROM pg_class r WHERE relkind = 'r' AND relname = '%s';", + "SELECT reltuples FROM pg_class r WHERE (relkind = 'r' or relkind = 'p') AND relname = '%s';", table.getTablePath().getTableName()); try (Statement stmt = connection.createStatement()) { log.info("Split Chunk, approximateRowCntStatement: {}", rowCountQuery); From 8a4127752f8a366d7524602c8121a48c97580dfa Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Thu, 2 Jan 2025 16:29:06 +0800 Subject: [PATCH 09/24] [connector-cdc-postgres] support read partition table --- .../src/test/resources/ddl/inventory.sql | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql index 161493a73e9..959a8a7ec4d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql @@ -169,7 +169,7 @@ CREATE TABLE sink_postgres_cdc_table_3 ); --- 创建源分区表 + CREATE TABLE source_partitioned_table ( id INTEGER NOT NULL, @@ -178,7 +178,7 @@ CREATE TABLE source_partitioned_table PRIMARY KEY (id, event_time) ) PARTITION BY RANGE (event_time); --- 创建源分区 + CREATE TABLE source_partitioned_table_2023 PARTITION OF source_partitioned_table FOR VALUES FROM ('2023-01-01 00:00:00') TO ('2024-01-01 00:00:00'); @@ -187,7 +187,7 @@ CREATE TABLE source_partitioned_table_2024 PARTITION OF source_partitioned_table FOR VALUES FROM ('2024-01-01 00:00:00') TO ('2025-01-01 00:00:00'); --- 创建目标分区表 + CREATE TABLE sink_partitioned_table ( id INTEGER NOT NULL, @@ -196,7 +196,7 @@ CREATE TABLE sink_partitioned_table PRIMARY KEY (id, event_time) ) PARTITION BY RANGE (event_time); --- 创建目标分区 + CREATE TABLE sink_partitioned_table_2023 PARTITION OF sink_partitioned_table FOR VALUES FROM ('2023-01-01 00:00:00') TO ('2024-01-01 00:00:00'); @@ -205,11 +205,11 @@ CREATE TABLE sink_partitioned_table_2024 PARTITION OF sink_partitioned_table FOR VALUES FROM ('2024-01-01 00:00:00') TO ('2025-01-01 00:00:00'); --- 设置复制标识,支持 PostgreSQL CDC + ALTER TABLE source_partitioned_table REPLICA IDENTITY FULL; ALTER TABLE sink_partitioned_table REPLICA IDENTITY FULL; --- 插入测试数据到源分区表 + INSERT INTO source_partitioned_table (id, data, event_time) VALUES (1, 'Partitioned data 2023', '2023-06-01 12:00:00'), (2, 'Partitioned data 2024', '2024-03-15 09:30:00'); From cb6b56ac3ee328cace94deacb6cae7379337f945 Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Thu, 9 Jan 2025 20:40:38 +0800 Subject: [PATCH 10/24] [connector-cdc-postgres] support read partition table --- .../cdc/postgres/utils/PostgresUtils.java | 63 +++++++++++++++++-- 1 file changed, 57 insertions(+), 6 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java index d79fe446857..1d15494d644 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java @@ -84,14 +84,44 @@ public static Object[] queryMinMax( }); } + private static String resolveVersion(JdbcConnection jdbc) { + try (Statement statement = jdbc.connection().createStatement(); + ResultSet resultSet = statement.executeQuery("SELECT version()")) { + resultSet.next(); + return resultSet.getString(1); + } catch (Exception e) { + log.info( + "Failed to get PostgreSQL version, fallback to default version: {}", + e.getMessage(), + e); + return ""; + } + } + public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException { - // The statement used to get approximate row count which is less - // accurate than COUNT(*), but is more efficient for large table. - final String rowCountQuery = - String.format( - "SELECT reltuples FROM pg_class r WHERE (relkind = 'r' or relkind = 'p') AND relname = '%s';", - tableId.table()); + String version = resolveVersion(jdbc); + log.info("Detected PostgreSQL version: {}", version); + + // 判断版本是否大于等于13 + boolean isVersion13OrAbove = isPostgresVersion13OrAbove(version); + + final String rowCountQuery; + if (isVersion13OrAbove) { + // 大于等于 13 的查询 + rowCountQuery = + String.format( + "SELECT reltuples FROM pg_class r WHERE (relkind = 'r' OR relkind = 'p') AND relname = '%s';", + tableId.table()); + } else { + // 低于 13 的查询或替代方案 + rowCountQuery = + String.format( + "SELECT reltuples FROM pg_class r WHERE relkind = 'r' AND relname = '%s';", + tableId.table()); + } + + log.info("Executing query: {}", rowCountQuery); return jdbc.queryAndMap( rowCountQuery, rs -> { @@ -105,6 +135,27 @@ public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) }); } + private static boolean isPostgresVersion13OrAbove(String version) { + // 从 PostgreSQL 版本字符串中提取主要版本号 + if (version == null || version.isEmpty()) { + log.warn("PostgreSQL version is empty or null. Assuming version < 13."); + return false; + } + try { + // 例如 "PostgreSQL 13.3 (Ubuntu 13.3-1.pgdg20.04+1)" 提取出 "13" + String[] parts = version.split(" "); + for (String part : parts) { + if (part.matches("\\d+(\\.\\d+)?")) { // 匹配数字版本 + int majorVersion = Integer.parseInt(part.split("\\.")[0]); + return majorVersion >= 13; + } + } + } catch (Exception e) { + log.warn("Failed to parse PostgreSQL version: {}. Assuming version < 13.", version, e); + } + return false; + } + public static Object queryMin( JdbcConnection jdbc, TableId tableId, From 215268ed3475e8bf5919ab89ab515956e206f044 Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Thu, 9 Jan 2025 20:41:13 +0800 Subject: [PATCH 11/24] [connector-cdc-postgres] support read partition table --- .../seatunnel/cdc/postgres/PostgresCDCIT.java | 19 ++++++++----------- .../src/test/resources/ddl/inventory.sql | 6 ++++-- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java index 0c06013576b..b188a74bd0e 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java @@ -17,8 +17,6 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.postgres; -import org.apache.seatunnel.shade.com.google.common.collect.Lists; - import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfigFactory; import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfigFactory; @@ -45,6 +43,7 @@ import org.testcontainers.lifecycle.Startables; import org.testcontainers.utility.DockerImageName; +import com.beust.jcommander.internal.Lists; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.TableId; import lombok.extern.slf4j.Slf4j; @@ -144,6 +143,7 @@ private String driverUrl() { @Override public void startUp() { log.info("The second stage: Starting Postgres containers..."); + POSTGRES_CONTAINER.setPortBindings( Lists.newArrayList( String.format( @@ -209,6 +209,7 @@ public void testPostgresPartitionedTableE2e(TestContainer container) { } return null; }); + await().atMost(60000, TimeUnit.MILLISECONDS) .untilAsserted( () -> { @@ -240,6 +241,7 @@ public void testPostgresPartitionedTableE2e(TestContainer container) { POSTGRESQL_SCHEMA, SINK_PARTITIONED_TABLE))); }); + System.out.println("111"); } finally { // Clear related content to ensure that multiple operations are not affected clearTable(POSTGRESQL_SCHEMA, SOURCE_PARTITIONED_TABLE); @@ -799,28 +801,23 @@ private void upsertDeleteSourcePartionTable(String database, String tableName) { + database + "." + tableName - + " VALUES (3, 'Sample Data 1', '2023-06-15 10:30:00');"); + + " VALUES (2, 'Sample Data 1', '2023-06-15 10:30:00');"); executeSql( "INSERT INTO " + database + "." + tableName - + " VALUES (4, 'Sample Data 2', '2023-07-20 15:45:00');"); + + " VALUES (3, 'Sample Data 2', '2023-07-20 15:45:00');"); - executeSql( - "DELETE FROM " - + database - + "." - + tableName - + " WHERE id = 3 AND event_time = '2023-06-15 10:30:00';"); + executeSql("DELETE FROM " + database + "." + tableName + " WHERE id = 2;"); executeSql( "UPDATE " + database + "." + tableName - + " SET data = 'Updated Data' WHERE id = 4 AND event_time = '2023-07-20 15:45:00';"); + + " SET data = 'Updated Data' WHERE id = 3;"); } private String getQuerySQL(String database, String tableName) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql index 959a8a7ec4d..fe163405d5f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql @@ -205,10 +205,12 @@ CREATE TABLE sink_partitioned_table_2024 PARTITION OF sink_partitioned_table FOR VALUES FROM ('2024-01-01 00:00:00') TO ('2025-01-01 00:00:00'); - +ALTER TABLE source_partitioned_table_2023 REPLICA IDENTITY FULL; +ALTER TABLE source_partitioned_table_2024 REPLICA IDENTITY FULL; ALTER TABLE source_partitioned_table REPLICA IDENTITY FULL; ALTER TABLE sink_partitioned_table REPLICA IDENTITY FULL; - +ALTER TABLE sink_partitioned_table_2023 REPLICA IDENTITY FULL; +ALTER TABLE sink_partitioned_table_2023 REPLICA IDENTITY FULL; INSERT INTO source_partitioned_table (id, data, event_time) VALUES (1, 'Partitioned data 2023', '2023-06-01 12:00:00'), From fba3c751e0ab953a8353264b986072a5dcdcd9c4 Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Thu, 9 Jan 2025 20:42:21 +0800 Subject: [PATCH 12/24] [connector-cdc-postgres] support read partition table --- .../jdbc/internal/dialect/psql/PostgresDialect.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java index adcafabe1de..f410509ac13 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java @@ -244,14 +244,14 @@ public Long approximateRowCntStatement(Connection connection, JdbcSourceTable ta boolean useTableStats = StringUtils.isBlank(table.getQuery()) || (!table.getQuery().toLowerCase().contains("where") - && table.getTablePath() != null - && !TablePath.DEFAULT - .getFullName() - .equals(table.getTablePath().getFullName())); + && table.getTablePath() != null + && !TablePath.DEFAULT + .getFullName() + .equals(table.getTablePath().getFullName())); if (useTableStats) { String rowCountQuery = String.format( - "SELECT reltuples FROM pg_class r WHERE (relkind = 'r' or relkind = 'p') AND relname = '%s';", + "SELECT reltuples FROM pg_class r WHERE relkind = 'r' AND relname = '%s';", table.getTablePath().getTableName()); try (Statement stmt = connection.createStatement()) { log.info("Split Chunk, approximateRowCntStatement: {}", rowCountQuery); From cc10ba308fff8d4326427aa3cdd7e4c3abba8e75 Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Fri, 10 Jan 2025 11:15:10 +0800 Subject: [PATCH 13/24] [connector-cdc-postgres] support read partition table --- .../jdbc/internal/dialect/psql/PostgresDialect.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java index f410509ac13..b56930303d7 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java @@ -244,10 +244,10 @@ public Long approximateRowCntStatement(Connection connection, JdbcSourceTable ta boolean useTableStats = StringUtils.isBlank(table.getQuery()) || (!table.getQuery().toLowerCase().contains("where") - && table.getTablePath() != null - && !TablePath.DEFAULT - .getFullName() - .equals(table.getTablePath().getFullName())); + && table.getTablePath() != null + && !TablePath.DEFAULT + .getFullName() + .equals(table.getTablePath().getFullName())); if (useTableStats) { String rowCountQuery = String.format( From 5a1230060c1c762582eab46b1e35e8d0fec72ab6 Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Fri, 10 Jan 2025 13:54:55 +0800 Subject: [PATCH 14/24] [connector-cdc-postgres] support read partition table --- .../seatunnel/cdc/postgres/PostgresCDCIT.java | 105 +++++++++++++++++- ...tgrescdc_to_postgres_with_partition12.conf | 60 ++++++++++ 2 files changed, 164 insertions(+), 1 deletion(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_partition12.conf diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java index b188a74bd0e..02758382711 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java @@ -93,6 +93,7 @@ public class PostgresCDCIT extends TestSuiteBase implements TestResource { private static final String SOURCE_TABLE_2 = "postgres_cdc_table_2"; private static final String SOURCE_TABLE_3 = "postgres_cdc_table_3"; private static final String SOURCE_PARTITIONED_TABLE = "source_partitioned_table"; + private static final String SOURCE_PARTITIONED_TABLE_2023 = "source_partitioned_table_2023"; private static final String SINK_TABLE_1 = "sink_postgres_cdc_table_1"; private static final String SINK_TABLE_2 = "sink_postgres_cdc_table_2"; private static final String SINK_TABLE_3 = "sink_postgres_cdc_table_3"; @@ -195,9 +196,60 @@ public void testMPostgresCdcCheckDataE2e(TestContainer container) { } } + private static String resolveVersion(Connection jdbc) { + try (Statement statement = jdbc.createStatement(); + ResultSet resultSet = statement.executeQuery("SELECT version()")) { + resultSet.next(); + return resultSet.getString(1); + } catch (Exception e) { + log.info( + "Failed to get PostgreSQL version, fallback to default version: {}", + e.getMessage(), + e); + return ""; + } + } + + private static boolean isPostgresVersion13OrAbove(String version) { + // 从 PostgreSQL 版本字符串中提取主要版本号 + if (version == null || version.isEmpty()) { + log.warn("PostgreSQL version is empty or null. Assuming version < 13."); + return false; + } + try { + // 例如 "PostgreSQL 13.3 (Ubuntu 13.3-1.pgdg20.04+1)" 提取出 "13" + String[] parts = version.split(" "); + for (String part : parts) { + if (part.matches("\\d+(\\.\\d+)?")) { // 匹配数字版本 + int majorVersion = Integer.parseInt(part.split("\\.")[0]); + return majorVersion >= 13; + } + } + } catch (Exception e) { + log.warn("Failed to parse PostgreSQL version: {}. Assuming version < 13.", version, e); + } + return false; + } + @TestTemplate public void testPostgresPartitionedTableE2e(TestContainer container) { + try (Connection connection = getJdbcConnection(); ) { + String version = resolveVersion(connection); + log.info("Detected PostgreSQL version: {}", version); + + // 判断版本是否大于等于13 + boolean isVersion13OrAbove = isPostgresVersion13OrAbove(version); + if (isVersion13OrAbove) { + Partition13(container); + } else { + Partition12(container); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + public void Partition13(TestContainer container) { try { CompletableFuture.supplyAsync( () -> { @@ -241,7 +293,6 @@ public void testPostgresPartitionedTableE2e(TestContainer container) { POSTGRESQL_SCHEMA, SINK_PARTITIONED_TABLE))); }); - System.out.println("111"); } finally { // Clear related content to ensure that multiple operations are not affected clearTable(POSTGRESQL_SCHEMA, SOURCE_PARTITIONED_TABLE); @@ -249,6 +300,58 @@ public void testPostgresPartitionedTableE2e(TestContainer container) { } } + public void Partition12(TestContainer container) { + try { + CompletableFuture.supplyAsync( + () -> { + try { + container.executeJob("/postgrescdc_to_postgres_with_partition12.conf"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + query( + getQuerySQL( + POSTGRESQL_SCHEMA, + SOURCE_PARTITIONED_TABLE_2023)), + query( + getQuerySQL( + POSTGRESQL_SCHEMA, + SINK_PARTITIONED_TABLE))); + }); + + // insert update delete + upsertDeleteSourcePartionTable(POSTGRESQL_SCHEMA, SOURCE_PARTITIONED_TABLE_2023); + + // stream stage + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + query( + getQuerySQL( + POSTGRESQL_SCHEMA, + SOURCE_PARTITIONED_TABLE_2023)), + query( + getQuerySQL( + POSTGRESQL_SCHEMA, + SINK_PARTITIONED_TABLE))); + }); + + } finally { + // Clear related content to ensure that multiple operations are not affected + clearTable(POSTGRESQL_SCHEMA, SOURCE_PARTITIONED_TABLE_2023); + clearTable(POSTGRESQL_SCHEMA, SINK_PARTITIONED_TABLE); + } + } + @TestTemplate @DisabledOnContainer( value = {}, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_partition12.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_partition12.conf new file mode 100644 index 00000000000..b6dfbc158cc --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_partition12.conf @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # Engine configuration + execution.parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 + read_limit.bytes_per_second = 7000000 + read_limit.rows_per_second = 400 +} + +source { + Postgres-CDC { + plugin_output = "partitioned_table_source" + username = "postgres" + password = "postgres" + database-names = ["postgres_cdc"] + schema-names = ["inventory"] + table-names = ["postgres_cdc.inventory.source_partitioned_table_2023"] + base-url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF" + + } +} + +transform { + +} + +sink { + jdbc { + plugin_input = "partitioned_table_source" + url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF" + driver = "org.postgresql.Driver" + user = "postgres" + password = "postgres" + generate_sink_sql = true + database = postgres_cdc + table = inventory.sink_partitioned_table + schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" + data_save_mode="APPEND_DATA" + } +} \ No newline at end of file From 5102b781811329b25523aab3a0d4a1e537c71592 Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Fri, 10 Jan 2025 14:06:37 +0800 Subject: [PATCH 15/24] [connector-cdc-postgres] support read partition table --- .../seatunnel/cdc/postgres/utils/PostgresUtils.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java index 1d15494d644..95b79b82d37 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java @@ -103,18 +103,19 @@ public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) String version = resolveVersion(jdbc); log.info("Detected PostgreSQL version: {}", version); - // 判断版本是否大于等于13 + // Determine if the version is greater than or equal to 13 boolean isVersion13OrAbove = isPostgresVersion13OrAbove(version); final String rowCountQuery; if (isVersion13OrAbove) { - // 大于等于 13 的查询 + // Query if the version is greater than or equal to 13 rowCountQuery = String.format( "SELECT reltuples FROM pg_class r WHERE (relkind = 'r' OR relkind = 'p') AND relname = '%s';", tableId.table()); } else { - // 低于 13 的查询或替代方案 + // Provide an alternative if the version requirement is not met (i.e., if the version is + // less than 13) rowCountQuery = String.format( "SELECT reltuples FROM pg_class r WHERE relkind = 'r' AND relname = '%s';", @@ -136,13 +137,13 @@ public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) } private static boolean isPostgresVersion13OrAbove(String version) { - // 从 PostgreSQL 版本字符串中提取主要版本号 + // Extracting the major version number from a PostgreSQL version string if (version == null || version.isEmpty()) { log.warn("PostgreSQL version is empty or null. Assuming version < 13."); return false; } try { - // 例如 "PostgreSQL 13.3 (Ubuntu 13.3-1.pgdg20.04+1)" 提取出 "13" + // "PostgreSQL 13.3 (Ubuntu 13.3-1.pgdg20.04+1)" 提取出 "13" String[] parts = version.split(" "); for (String part : parts) { if (part.matches("\\d+(\\.\\d+)?")) { // 匹配数字版本 From c8a879ec7eaab5d8e3b4aa7c805d8fffda4929b1 Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Fri, 10 Jan 2025 14:14:28 +0800 Subject: [PATCH 16/24] [connector-cdc-postgres] support read partition table --- .../connectors/seatunnel/cdc/postgres/PostgresCDCIT.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java index 02758382711..b654d16e5bb 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java @@ -211,16 +211,15 @@ private static String resolveVersion(Connection jdbc) { } private static boolean isPostgresVersion13OrAbove(String version) { - // 从 PostgreSQL 版本字符串中提取主要版本号 + if (version == null || version.isEmpty()) { log.warn("PostgreSQL version is empty or null. Assuming version < 13."); return false; } try { - // 例如 "PostgreSQL 13.3 (Ubuntu 13.3-1.pgdg20.04+1)" 提取出 "13" String[] parts = version.split(" "); for (String part : parts) { - if (part.matches("\\d+(\\.\\d+)?")) { // 匹配数字版本 + if (part.matches("\\d+(\\.\\d+)?")) { int majorVersion = Integer.parseInt(part.split("\\.")[0]); return majorVersion >= 13; } @@ -236,8 +235,6 @@ public void testPostgresPartitionedTableE2e(TestContainer container) { try (Connection connection = getJdbcConnection(); ) { String version = resolveVersion(connection); log.info("Detected PostgreSQL version: {}", version); - - // 判断版本是否大于等于13 boolean isVersion13OrAbove = isPostgresVersion13OrAbove(version); if (isVersion13OrAbove) { Partition13(container); From f57b97f7f3c39411e2173e6762518bf514a1210b Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Fri, 10 Jan 2025 14:17:02 +0800 Subject: [PATCH 17/24] [connector-cdc-postgres] support read partition table --- .../seatunnel/cdc/postgres/utils/PostgresUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java index 95b79b82d37..ecae14b90cc 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java @@ -143,10 +143,10 @@ private static boolean isPostgresVersion13OrAbove(String version) { return false; } try { - // "PostgreSQL 13.3 (Ubuntu 13.3-1.pgdg20.04+1)" 提取出 "13" + // "PostgreSQL 13.3 (Ubuntu 13.3-1.pgdg20.04+1)" "13" String[] parts = version.split(" "); for (String part : parts) { - if (part.matches("\\d+(\\.\\d+)?")) { // 匹配数字版本 + if (part.matches("\\d+(\\.\\d+)?")) { // Matching numeric versions int majorVersion = Integer.parseInt(part.split("\\.")[0]); return majorVersion >= 13; } From becab95280e55209162623cb73409293e0d52681 Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Sun, 12 Jan 2025 15:24:04 +0800 Subject: [PATCH 18/24] [Feature][Imap] Support for persistent cluster map without loading information on completed jobs during startup --- .../engine/server/CoordinatorService.java | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index 5a65f248ae2..fe831cd17c9 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.engine.server; +import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions; import org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.seatunnel.api.common.metrics.JobMetrics; @@ -396,7 +397,23 @@ private void initCoordinatorService() { ownedSlotProfilesIMap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES); metricsImap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS); - + IMap finishedJobStateImap = + nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE); + IMap finishedJobMetricsImap = + nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_METRICS); + IMap finishedJobVertexInfoImap = + nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO); + if(ServerConfigOptions.IS_INIT_HISTORY_FINISH_JOB.defaultValue()){ + for (Map.Entry entry : finishedJobStateImap.entrySet()) { + finishedJobStateImap.delete(entry.getKey()); + } + for (Map.Entry entry : finishedJobMetricsImap.entrySet()) { + finishedJobMetricsImap.delete(entry.getKey()); + } + for (Map.Entry entry : finishedJobVertexInfoImap.entrySet()) { + finishedJobVertexInfoImap.delete(entry.getKey()); + } + } jobHistoryService = new JobHistoryService( nodeEngine, @@ -404,13 +421,9 @@ private void initCoordinatorService() { logger, pendingJobMasterMap, runningJobMasterMap, - nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE), - nodeEngine - .getHazelcastInstance() - .getMap(Constant.IMAP_FINISHED_JOB_METRICS), - nodeEngine - .getHazelcastInstance() - .getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO), + finishedJobStateImap, + finishedJobMetricsImap, + finishedJobVertexInfoImap, engineConfig.getHistoryJobExpireMinutes()); eventProcessor = createJobEventProcessor( From 0b715257a1a0ac2da9f56f8e080ff2861220986c Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Sun, 12 Jan 2025 15:24:17 +0800 Subject: [PATCH 19/24] [Feature][Imap] Support for persistent cluster map without loading information on completed jobs during startup --- .../engine/common/config/server/ServerConfigOptions.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index e0de7617429..da17ab3eb9a 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -156,7 +156,11 @@ public class ServerConfigOptions { .intType() .defaultValue(1440) .withDescription("The expire time of history jobs.time unit minute"); - + public static final Option IS_INIT_HISTORY_FINISH_JOB = + Options.key("is-init-history-finish-job") + .booleanType() + .defaultValue(false) + .withDescription("During the restart of a persistent cluster, is it necessary to reload the information related to completed jobs from the persistence files anew?"); public static final Option JOB_SCHEDULE_STRATEGY = Options.key("job-schedule-strategy") .enumType(ScheduleStrategy.class) From f1a8e507af48fb36bb421e78049ffcd07e44ce3c Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Sun, 12 Jan 2025 15:32:19 +0800 Subject: [PATCH 20/24] [Feature][Imap] Support for persistent cluster map without loading information on completed jobs during startup --- .../apache/seatunnel/engine/server/CoordinatorService.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index fe831cd17c9..965a621348e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -17,7 +17,6 @@ package org.apache.seatunnel.engine.server; -import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions; import org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.seatunnel.api.common.metrics.JobMetrics; @@ -33,6 +32,7 @@ import org.apache.seatunnel.engine.common.config.EngineConfig; import org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig; import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy; +import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions; import org.apache.seatunnel.engine.common.exception.JobException; import org.apache.seatunnel.engine.common.exception.JobNotFoundException; import org.apache.seatunnel.engine.common.exception.SavePointFailedException; @@ -403,8 +403,9 @@ private void initCoordinatorService() { nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_METRICS); IMap finishedJobVertexInfoImap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO); - if(ServerConfigOptions.IS_INIT_HISTORY_FINISH_JOB.defaultValue()){ - for (Map.Entry entry : finishedJobStateImap.entrySet()) { + if (ServerConfigOptions.IS_INIT_HISTORY_FINISH_JOB.defaultValue()) { + for (Map.Entry entry : + finishedJobStateImap.entrySet()) { finishedJobStateImap.delete(entry.getKey()); } for (Map.Entry entry : finishedJobMetricsImap.entrySet()) { From 9231a474cf01e6f3336bdf3ee0eb40d708fe253a Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Sun, 12 Jan 2025 15:37:04 +0800 Subject: [PATCH 21/24] [Feature][Imap] Support for persistent cluster map without loading information on completed jobs during startup --- .../engine/common/config/server/ServerConfigOptions.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index da17ab3eb9a..8f5da4e1983 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -160,7 +160,8 @@ public class ServerConfigOptions { Options.key("is-init-history-finish-job") .booleanType() .defaultValue(false) - .withDescription("During the restart of a persistent cluster, is it necessary to reload the information related to completed jobs from the persistence files anew?"); + .withDescription( + "During the restart of a persistent cluster, is it necessary to reload the information related to completed jobs from the persistence files anew?"); public static final Option JOB_SCHEDULE_STRATEGY = Options.key("job-schedule-strategy") .enumType(ScheduleStrategy.class) From ad7750f6058d554f29cda052cf89390495e284a3 Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Sun, 12 Jan 2025 15:48:49 +0800 Subject: [PATCH 22/24] The reason for the inquiry is that the previous code submission has not yet been approved. --- .../config/server/ServerConfigOptions.java | 6 ---- .../engine/server/CoordinatorService.java | 29 +++++-------------- 2 files changed, 7 insertions(+), 28 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index 8f5da4e1983..01b16630d1f 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -156,12 +156,6 @@ public class ServerConfigOptions { .intType() .defaultValue(1440) .withDescription("The expire time of history jobs.time unit minute"); - public static final Option IS_INIT_HISTORY_FINISH_JOB = - Options.key("is-init-history-finish-job") - .booleanType() - .defaultValue(false) - .withDescription( - "During the restart of a persistent cluster, is it necessary to reload the information related to completed jobs from the persistence files anew?"); public static final Option JOB_SCHEDULE_STRATEGY = Options.key("job-schedule-strategy") .enumType(ScheduleStrategy.class) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index 965a621348e..57f9c64ed33 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -32,7 +32,6 @@ import org.apache.seatunnel.engine.common.config.EngineConfig; import org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig; import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy; -import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions; import org.apache.seatunnel.engine.common.exception.JobException; import org.apache.seatunnel.engine.common.exception.JobNotFoundException; import org.apache.seatunnel.engine.common.exception.SavePointFailedException; @@ -397,24 +396,6 @@ private void initCoordinatorService() { ownedSlotProfilesIMap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES); metricsImap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS); - IMap finishedJobStateImap = - nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE); - IMap finishedJobMetricsImap = - nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_METRICS); - IMap finishedJobVertexInfoImap = - nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO); - if (ServerConfigOptions.IS_INIT_HISTORY_FINISH_JOB.defaultValue()) { - for (Map.Entry entry : - finishedJobStateImap.entrySet()) { - finishedJobStateImap.delete(entry.getKey()); - } - for (Map.Entry entry : finishedJobMetricsImap.entrySet()) { - finishedJobMetricsImap.delete(entry.getKey()); - } - for (Map.Entry entry : finishedJobVertexInfoImap.entrySet()) { - finishedJobVertexInfoImap.delete(entry.getKey()); - } - } jobHistoryService = new JobHistoryService( nodeEngine, @@ -422,9 +403,13 @@ private void initCoordinatorService() { logger, pendingJobMasterMap, runningJobMasterMap, - finishedJobStateImap, - finishedJobMetricsImap, - finishedJobVertexInfoImap, + nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE), + nodeEngine + .getHazelcastInstance() + .getMap(Constant.IMAP_FINISHED_JOB_METRICS), + nodeEngine + .getHazelcastInstance() + .getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO), engineConfig.getHistoryJobExpireMinutes()); eventProcessor = createJobEventProcessor( From 186c5920e180a7ad7e89d948b8b708f48d924a9b Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Sun, 12 Jan 2025 15:55:41 +0800 Subject: [PATCH 23/24] The reason for the inquiry is that the previous code submission has not yet been approved. --- .../org/apache/seatunnel/engine/server/CoordinatorService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index 57f9c64ed33..5a65f248ae2 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -396,6 +396,7 @@ private void initCoordinatorService() { ownedSlotProfilesIMap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES); metricsImap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS); + jobHistoryService = new JobHistoryService( nodeEngine, From 4d83a041e7e1302af4901a22bcdc07f4b4e56aac Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Sun, 12 Jan 2025 15:56:39 +0800 Subject: [PATCH 24/24] The reason for the inquiry is that the previous code submission has not yet been approved. --- .../engine/common/config/server/ServerConfigOptions.java | 1 + 1 file changed, 1 insertion(+) diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index 01b16630d1f..e0de7617429 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -156,6 +156,7 @@ public class ServerConfigOptions { .intType() .defaultValue(1440) .withDescription("The expire time of history jobs.time unit minute"); + public static final Option JOB_SCHEDULE_STRATEGY = Options.key("job-schedule-strategy") .enumType(ScheduleStrategy.class)