diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java index 5a95f1efd52..f3cfa80e72d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java @@ -44,6 +44,9 @@ public class PaimonDataSink implements DataSink, Serializable { // options for creating Paimon table. private final Map tableOptions; + // options that apply to a specific table + private final Map tableSpecificOptions; + private final String commitUser; private final Map> partitionMaps; @@ -57,6 +60,7 @@ public class PaimonDataSink implements DataSink, Serializable { public PaimonDataSink( Options options, Map tableOptions, + Map tableSpecificOptions, String commitUser, Map> partitionMaps, PaimonRecordSerializer serializer, @@ -64,6 +68,7 @@ public PaimonDataSink( String schemaOperatorUid) { this.options = options; this.tableOptions = tableOptions; + this.tableSpecificOptions = tableSpecificOptions; this.commitUser = commitUser; this.partitionMaps = partitionMaps; this.serializer = serializer; @@ -79,7 +84,8 @@ public EventSinkProvider getEventSinkProvider() { @Override public MetadataApplier getMetadataApplier() { - return new PaimonMetadataApplier(options, tableOptions, partitionMaps); + return new PaimonMetadataApplier( + options, tableOptions, tableSpecificOptions, partitionMaps); } @Override diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java index a165946d4dd..5b3d7ca1c3c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java @@ -39,6 +39,7 @@ import java.util.Set; import static org.apache.flink.cdc.connectors.paimon.sink.PaimonDataSinkOptions.PREFIX_CATALOG_PROPERTIES; +import static org.apache.flink.cdc.connectors.paimon.sink.PaimonDataSinkOptions.PREFIX_SPECIFIC_TABLE_PROPERTIES; import static org.apache.flink.cdc.connectors.paimon.sink.PaimonDataSinkOptions.PREFIX_TABLE_PROPERTIES; /** A {@link DataSinkFactory} to create {@link PaimonDataSink}. */ @@ -49,15 +50,21 @@ public class PaimonDataSinkFactory implements DataSinkFactory { @Override public DataSink createDataSink(Context context) { FactoryHelper.createFactoryHelper(this, context) - .validateExcept(PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES); + .validateExcept(PREFIX_TABLE_PROPERTIES, PREFIX_SPECIFIC_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES); Map allOptions = context.getFactoryConfiguration().toMap(); Map catalogOptions = new HashMap<>(); Map tableOptions = new HashMap<>(); + Map tableSpecificOptions = new HashMap<>(); allOptions.forEach( (key, value) -> { if (key.startsWith(PREFIX_TABLE_PROPERTIES)) { - tableOptions.put(key.substring(PREFIX_TABLE_PROPERTIES.length()), value); + String keyWithoutPrefix = key.substring(PREFIX_TABLE_PROPERTIES.length()); + // General table option: table.properties. + tableOptions.put(keyWithoutPrefix, value); + } else if (key.startsWith(PREFIX_SPECIFIC_TABLE_PROPERTIES)) { + // Table-specific option: specific.table.properties... + tableSpecificOptions.put(key, value); } else if (key.startsWith(PaimonDataSinkOptions.PREFIX_CATALOG_PROPERTIES)) { catalogOptions.put( key.substring( @@ -103,6 +110,7 @@ public DataSink createDataSink(Context context) { return new PaimonDataSink( options, tableOptions, + tableSpecificOptions, commitUser, partitionMaps, serializer, diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkOptions.java index 5f2712a93f6..574d4dda989 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkOptions.java @@ -29,6 +29,9 @@ public class PaimonDataSinkOptions { // prefix for passing properties for table creation. public static final String PREFIX_TABLE_PROPERTIES = "table.properties."; + // prefix for passing table-specific properties. + public static final String PREFIX_SPECIFIC_TABLE_PROPERTIES = "specific.table.properties."; + // prefix for passing properties for catalog creation. public static final String PREFIX_CATALOG_PROPERTIES = "catalog.properties."; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java index 06639ecaf7a..291fd387994 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java @@ -67,9 +67,12 @@ public class PaimonMetadataApplier implements MetadataApplier { // Catalog is unSerializable. private transient Catalog catalog; - // currently, we set table options for all tables using the same options. + // options that apply to all tables private final Map tableOptions; + // options that apply to a specific table + private final Map tableSpecificOptions; + private final Options catalogOptions; private final Map> partitionMaps; @@ -79,6 +82,7 @@ public class PaimonMetadataApplier implements MetadataApplier { public PaimonMetadataApplier(Options catalogOptions) { this.catalogOptions = catalogOptions; this.tableOptions = new HashMap<>(); + this.tableSpecificOptions = new HashMap<>(); this.partitionMaps = new HashMap<>(); this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes(); } @@ -86,9 +90,11 @@ public PaimonMetadataApplier(Options catalogOptions) { public PaimonMetadataApplier( Options catalogOptions, Map tableOptions, + Map tableSpecificOptions, Map> partitionMaps) { this.catalogOptions = catalogOptions; this.tableOptions = tableOptions; + this.tableSpecificOptions = tableSpecificOptions; this.partitionMaps = partitionMaps; this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes(); } @@ -189,10 +195,32 @@ private void applyCreateTable(CreateTableEvent event) throws SchemaEvolveExcepti primaryKeys.add(partitionColumn); } } + + // Build effective options: start with general table options, then add table-specific options + // Table-specific options take precedence over general table options for the same key + Map effectiveOptions = new HashMap<>(); + + // First, add general table options + effectiveOptions.putAll(tableOptions); + + // Then, add table-specific options for this table (these take precedence) + String tablePrefix = + "specific.table.properties." + + event.tableId().getSchemaName() + + "." + + event.tableId().getTableName() + + "."; + for (Map.Entry entry : tableSpecificOptions.entrySet()) { + if (entry.getKey().startsWith(tablePrefix)) { + String propertyName = entry.getKey().substring(tablePrefix.length()); + effectiveOptions.put(propertyName, entry.getValue()); + } + } + builder.partitionKeys(partitionKeys) .primaryKey(primaryKeys) .comment(schema.comment()) - .options(tableOptions) + .options(effectiveOptions) .options(schema.options()); catalog.createTable(tableIdToIdentifier(event), builder.build(), true); } catch (Catalog.TableAlreadyExistException diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java index 1ca73444d7e..021e83c42bd 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java @@ -76,7 +76,8 @@ public void testHashCodeForAppendOnlyTable() { TableId tableId = TableId.tableId(TEST_DATABASE, "test_table"); Map tableOptions = new HashMap<>(); MetadataApplier metadataApplier = - new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + new PaimonMetadataApplier( + catalogOptions, tableOptions, new HashMap<>(), new HashMap<>()); Schema schema = Schema.newBuilder() .physicalColumn("col1", DataTypes.STRING().notNull()) @@ -132,7 +133,8 @@ void testHashCodeForFixedBucketTable() { Map tableOptions = new HashMap<>(); tableOptions.put("bucket", "10"); MetadataApplier metadataApplier = - new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + new PaimonMetadataApplier( + catalogOptions, tableOptions, new HashMap<>(), new HashMap<>()); Schema schema = Schema.newBuilder() .physicalColumn("col1", DataTypes.STRING().notNull()) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java index d349d607022..8a8d3362a0e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java @@ -250,7 +250,8 @@ public void testCreateTableWithoutPrimaryKey(String metastore) Map tableOptions = new HashMap<>(); tableOptions.put("bucket", "-1"); MetadataApplier metadataApplier = - new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + new PaimonMetadataApplier( + catalogOptions, tableOptions, new HashMap<>(), new HashMap<>()); CreateTableEvent createTableEvent = new CreateTableEvent( TableId.parse("test.table1"), @@ -295,7 +296,8 @@ void testCreateTableWithOptions(String metastore) Map> partitionMaps = new HashMap<>(); partitionMaps.put(TableId.parse("test.table1"), Arrays.asList("col3", "col4")); MetadataApplier metadataApplier = - new PaimonMetadataApplier(catalogOptions, tableOptions, partitionMaps); + new PaimonMetadataApplier( + catalogOptions, tableOptions, new HashMap<>(), partitionMaps); CreateTableEvent createTableEvent = new CreateTableEvent( TableId.parse("test.table1"), @@ -540,7 +542,8 @@ public void testCreateTableWithComment(String metastore) Map tableOptions = new HashMap<>(); tableOptions.put("bucket", "-1"); MetadataApplier metadataApplier = - new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + new PaimonMetadataApplier( + catalogOptions, tableOptions, new HashMap<>(), new HashMap<>()); CreateTableEvent createTableEvent = new CreateTableEvent( TableId.parse("test.table_with_comment"), @@ -580,4 +583,139 @@ public void testCreateTableWithComment(String metastore) Assertions.assertThat(table.options()).containsEntry("bucket", "-1"); Assertions.assertThat(table.comment()).contains("comment of table_with_comment"); } + + @ParameterizedTest + @ValueSource(strings = {"filesystem", "hive"}) + void testCreateTableWithTableSpecificOptions(String metastore) + throws Catalog.TableNotExistException, Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, SchemaEvolveException { + initialize(metastore); + Map tableOptions = new HashMap<>(); + tableOptions.put("bucket", "10"); + + Map tableSpecificOptions = new HashMap<>(); + // Table-specific option for test.table1 + tableSpecificOptions.put("specific.table.properties.test.table1.compaction.min.file-num", "5"); + tableSpecificOptions.put("specific.table.properties.test.table1.write-buffer-size", "256MB"); + // Table-specific option for test.table2 + tableSpecificOptions.put("specific.table.properties.test.table2.compaction.min.file-num", "3"); + tableSpecificOptions.put("specific.table.properties.test.table2.write-buffer-size", "128MB"); + + MetadataApplier metadataApplier = + new PaimonMetadataApplier( + catalogOptions, tableOptions, tableSpecificOptions, new HashMap<>()); + + // Create first table + CreateTableEvent createTableEvent1 = + new CreateTableEvent( + TableId.parse("test.table1"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .primaryKey("col1") + .build()); + metadataApplier.applySchemaChange(createTableEvent1); + Table table1 = catalog.getTable(Identifier.fromString("test.table1")); + + // Verify table1 has general table options and its specific options + Assertions.assertThat(table1.options()).containsEntry("bucket", "10"); + Assertions.assertThat(table1.options()).containsEntry("compaction.min.file-num", "5"); + Assertions.assertThat(table1.options()).containsEntry("write-buffer-size", "256MB"); + + // Create second table + CreateTableEvent createTableEvent2 = + new CreateTableEvent( + TableId.parse("test.table2"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .primaryKey("col1") + .build()); + metadataApplier.applySchemaChange(createTableEvent2); + Table table2 = catalog.getTable(Identifier.fromString("test.table2")); + + // Verify table2 has general table options and its specific options (different from table1) + Assertions.assertThat(table2.options()).containsEntry("bucket", "10"); + Assertions.assertThat(table2.options()).containsEntry("compaction.min.file-num", "3"); + Assertions.assertThat(table2.options()).containsEntry("write-buffer-size", "128MB"); + + // Create third table with no specific options + CreateTableEvent createTableEvent3 = + new CreateTableEvent( + TableId.parse("test.table3"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .primaryKey("col1") + .build()); + metadataApplier.applySchemaChange(createTableEvent3); + Table table3 = catalog.getTable(Identifier.fromString("test.table3")); + + // Verify table3 only has general table options + Assertions.assertThat(table3.options()).containsEntry("bucket", "10"); + Assertions.assertThat(table3.options()).doesNotContainKey("compaction.min.file-num"); + Assertions.assertThat(table3.options()).doesNotContainKey("write-buffer-size"); + } + + @ParameterizedTest + @ValueSource(strings = {"filesystem", "hive"}) + void testTableSpecificOptionsPrecedenceOverTableOptions(String metastore) + throws Catalog.TableNotExistException, Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, SchemaEvolveException { + initialize(metastore); + Map tableOptions = new HashMap<>(); + tableOptions.put("bucket", "10"); + tableOptions.put( + "compaction.min.file-num", "8"); // This should be overridden by table-specific setting + + Map tableSpecificOptions = new HashMap<>(); + // Table-specific option that conflicts with general table option + tableSpecificOptions.put("specific.table.properties.test.table1.compaction.min.file-num", "5"); + tableSpecificOptions.put("specific.table.properties.test.table1.write-buffer-size", "256MB"); + + MetadataApplier metadataApplier = + new PaimonMetadataApplier( + catalogOptions, tableOptions, tableSpecificOptions, new HashMap<>()); + + CreateTableEvent createTableEvent = + new CreateTableEvent( + TableId.parse("test.table1"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .primaryKey("col1") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + Table table = catalog.getTable(Identifier.fromString("test.table1")); + + // Verify table-specific options take precedence over general table options + Assertions.assertThat(table.options()).containsEntry("bucket", "10"); + Assertions.assertThat(table.options()) + .containsEntry( + "compaction.min.file-num", + "5"); // Should be 5 (table-specific), not 8 (general table option) + Assertions.assertThat(table.options()) + .containsEntry( + "write-buffer-size", "256MB"); // Should be from table-specific options + } }