Skip to content

[pipeline-connector/paimon] Support table specific table.properties configuration #4078

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public class PaimonDataSink implements DataSink, Serializable {
// options for creating Paimon table.
private final Map<String, String> tableOptions;

// options that apply to a specific table
private final Map<String, String> tableSpecificOptions;

private final String commitUser;

private final Map<TableId, List<String>> partitionMaps;
Expand All @@ -57,13 +60,15 @@ public class PaimonDataSink implements DataSink, Serializable {
public PaimonDataSink(
Options options,
Map<String, String> tableOptions,
Map<String, String> tableSpecificOptions,
String commitUser,
Map<TableId, List<String>> partitionMaps,
PaimonRecordSerializer<Event> serializer,
ZoneId zoneId,
String schemaOperatorUid) {
this.options = options;
this.tableOptions = tableOptions;
this.tableSpecificOptions = tableSpecificOptions;
this.commitUser = commitUser;
this.partitionMaps = partitionMaps;
this.serializer = serializer;
Expand All @@ -79,7 +84,8 @@ public EventSinkProvider getEventSinkProvider() {

@Override
public MetadataApplier getMetadataApplier() {
return new PaimonMetadataApplier(options, tableOptions, partitionMaps);
return new PaimonMetadataApplier(
options, tableOptions, tableSpecificOptions, partitionMaps);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Set;

import static org.apache.flink.cdc.connectors.paimon.sink.PaimonDataSinkOptions.PREFIX_CATALOG_PROPERTIES;
import static org.apache.flink.cdc.connectors.paimon.sink.PaimonDataSinkOptions.PREFIX_SPECIFIC_TABLE_PROPERTIES;
import static org.apache.flink.cdc.connectors.paimon.sink.PaimonDataSinkOptions.PREFIX_TABLE_PROPERTIES;

/** A {@link DataSinkFactory} to create {@link PaimonDataSink}. */
Expand All @@ -49,15 +50,21 @@ public class PaimonDataSinkFactory implements DataSinkFactory {
@Override
public DataSink createDataSink(Context context) {
FactoryHelper.createFactoryHelper(this, context)
.validateExcept(PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES);
.validateExcept(PREFIX_TABLE_PROPERTIES, PREFIX_SPECIFIC_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES);

Map<String, String> allOptions = context.getFactoryConfiguration().toMap();
Map<String, String> catalogOptions = new HashMap<>();
Map<String, String> tableOptions = new HashMap<>();
Map<String, String> tableSpecificOptions = new HashMap<>();
allOptions.forEach(
(key, value) -> {
if (key.startsWith(PREFIX_TABLE_PROPERTIES)) {
tableOptions.put(key.substring(PREFIX_TABLE_PROPERTIES.length()), value);
String keyWithoutPrefix = key.substring(PREFIX_TABLE_PROPERTIES.length());
// General table option: table.properties.<property>
tableOptions.put(keyWithoutPrefix, value);
} else if (key.startsWith(PREFIX_SPECIFIC_TABLE_PROPERTIES)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we flip these so that PREFIX_SPECIFIC_TABLE_PROPERTIES take priority of PREFIX_TABLE_PROPERTIES?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually… I just re-read it and realised it's in the for-each in parsing, not when they're actually applied.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is the behavior. Test case testTableSpecificOptionsPrecedenceOverTableOptions demonstrates it.

The function here parses the configuration. Then here the table specific overrides are applied when a table is created and its properties are set.

// Table-specific option: specific.table.properties.<database>.<table>.<property>
tableSpecificOptions.put(key, value);
} else if (key.startsWith(PaimonDataSinkOptions.PREFIX_CATALOG_PROPERTIES)) {
catalogOptions.put(
key.substring(
Expand Down Expand Up @@ -103,6 +110,7 @@ public DataSink createDataSink(Context context) {
return new PaimonDataSink(
options,
tableOptions,
tableSpecificOptions,
commitUser,
partitionMaps,
serializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public class PaimonDataSinkOptions {
// prefix for passing properties for table creation.
public static final String PREFIX_TABLE_PROPERTIES = "table.properties.";

// prefix for passing table-specific properties.
public static final String PREFIX_SPECIFIC_TABLE_PROPERTIES = "specific.table.properties.";

// prefix for passing properties for catalog creation.
public static final String PREFIX_CATALOG_PROPERTIES = "catalog.properties.";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,12 @@ public class PaimonMetadataApplier implements MetadataApplier {
// Catalog is unSerializable.
private transient Catalog catalog;

// currently, we set table options for all tables using the same options.
// options that apply to all tables
private final Map<String, String> tableOptions;

// options that apply to a specific table
private final Map<String, String> tableSpecificOptions;

private final Options catalogOptions;

private final Map<TableId, List<String>> partitionMaps;
Expand All @@ -79,16 +82,19 @@ public class PaimonMetadataApplier implements MetadataApplier {
public PaimonMetadataApplier(Options catalogOptions) {
this.catalogOptions = catalogOptions;
this.tableOptions = new HashMap<>();
this.tableSpecificOptions = new HashMap<>();
this.partitionMaps = new HashMap<>();
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
}

public PaimonMetadataApplier(
Options catalogOptions,
Map<String, String> tableOptions,
Map<String, String> tableSpecificOptions,
Map<TableId, List<String>> partitionMaps) {
this.catalogOptions = catalogOptions;
this.tableOptions = tableOptions;
this.tableSpecificOptions = tableSpecificOptions;
this.partitionMaps = partitionMaps;
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
}
Expand Down Expand Up @@ -189,10 +195,32 @@ private void applyCreateTable(CreateTableEvent event) throws SchemaEvolveExcepti
primaryKeys.add(partitionColumn);
}
}

// Build effective options: start with general table options, then add table-specific options
// Table-specific options take precedence over general table options for the same key
Map<String, String> effectiveOptions = new HashMap<>();

// First, add general table options
effectiveOptions.putAll(tableOptions);

// Then, add table-specific options for this table (these take precedence)
String tablePrefix =
"specific.table.properties."
+ event.tableId().getSchemaName()
+ "."
+ event.tableId().getTableName()
+ ".";
for (Map.Entry<String, String> entry : tableSpecificOptions.entrySet()) {
if (entry.getKey().startsWith(tablePrefix)) {
String propertyName = entry.getKey().substring(tablePrefix.length());
effectiveOptions.put(propertyName, entry.getValue());
}
}

builder.partitionKeys(partitionKeys)
.primaryKey(primaryKeys)
.comment(schema.comment())
.options(tableOptions)
.options(effectiveOptions)
.options(schema.options());
catalog.createTable(tableIdToIdentifier(event), builder.build(), true);
} catch (Catalog.TableAlreadyExistException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public void testHashCodeForAppendOnlyTable() {
TableId tableId = TableId.tableId(TEST_DATABASE, "test_table");
Map<String, String> tableOptions = new HashMap<>();
MetadataApplier metadataApplier =
new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
new PaimonMetadataApplier(
catalogOptions, tableOptions, new HashMap<>(), new HashMap<>());
Schema schema =
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING().notNull())
Expand Down Expand Up @@ -132,7 +133,8 @@ void testHashCodeForFixedBucketTable() {
Map<String, String> tableOptions = new HashMap<>();
tableOptions.put("bucket", "10");
MetadataApplier metadataApplier =
new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
new PaimonMetadataApplier(
catalogOptions, tableOptions, new HashMap<>(), new HashMap<>());
Schema schema =
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING().notNull())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ public void testCreateTableWithoutPrimaryKey(String metastore)
Map<String, String> tableOptions = new HashMap<>();
tableOptions.put("bucket", "-1");
MetadataApplier metadataApplier =
new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
new PaimonMetadataApplier(
catalogOptions, tableOptions, new HashMap<>(), new HashMap<>());
CreateTableEvent createTableEvent =
new CreateTableEvent(
TableId.parse("test.table1"),
Expand Down Expand Up @@ -295,7 +296,8 @@ void testCreateTableWithOptions(String metastore)
Map<TableId, List<String>> partitionMaps = new HashMap<>();
partitionMaps.put(TableId.parse("test.table1"), Arrays.asList("col3", "col4"));
MetadataApplier metadataApplier =
new PaimonMetadataApplier(catalogOptions, tableOptions, partitionMaps);
new PaimonMetadataApplier(
catalogOptions, tableOptions, new HashMap<>(), partitionMaps);
CreateTableEvent createTableEvent =
new CreateTableEvent(
TableId.parse("test.table1"),
Expand Down Expand Up @@ -540,7 +542,8 @@ public void testCreateTableWithComment(String metastore)
Map<String, String> tableOptions = new HashMap<>();
tableOptions.put("bucket", "-1");
MetadataApplier metadataApplier =
new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
new PaimonMetadataApplier(
catalogOptions, tableOptions, new HashMap<>(), new HashMap<>());
CreateTableEvent createTableEvent =
new CreateTableEvent(
TableId.parse("test.table_with_comment"),
Expand Down Expand Up @@ -580,4 +583,139 @@ public void testCreateTableWithComment(String metastore)
Assertions.assertThat(table.options()).containsEntry("bucket", "-1");
Assertions.assertThat(table.comment()).contains("comment of table_with_comment");
}

@ParameterizedTest
@ValueSource(strings = {"filesystem", "hive"})
void testCreateTableWithTableSpecificOptions(String metastore)
throws Catalog.TableNotExistException, Catalog.DatabaseNotEmptyException,
Catalog.DatabaseNotExistException, SchemaEvolveException {
initialize(metastore);
Map<String, String> tableOptions = new HashMap<>();
tableOptions.put("bucket", "10");

Map<String, String> tableSpecificOptions = new HashMap<>();
// Table-specific option for test.table1
tableSpecificOptions.put("specific.table.properties.test.table1.compaction.min.file-num", "5");
tableSpecificOptions.put("specific.table.properties.test.table1.write-buffer-size", "256MB");
// Table-specific option for test.table2
tableSpecificOptions.put("specific.table.properties.test.table2.compaction.min.file-num", "3");
tableSpecificOptions.put("specific.table.properties.test.table2.write-buffer-size", "128MB");

MetadataApplier metadataApplier =
new PaimonMetadataApplier(
catalogOptions, tableOptions, tableSpecificOptions, new HashMap<>());

// Create first table
CreateTableEvent createTableEvent1 =
new CreateTableEvent(
TableId.parse("test.table1"),
org.apache.flink.cdc.common.schema.Schema.newBuilder()
.physicalColumn(
"col1",
org.apache.flink.cdc.common.types.DataTypes.STRING()
.notNull())
.physicalColumn(
"col2",
org.apache.flink.cdc.common.types.DataTypes.STRING())
.primaryKey("col1")
.build());
metadataApplier.applySchemaChange(createTableEvent1);
Table table1 = catalog.getTable(Identifier.fromString("test.table1"));

// Verify table1 has general table options and its specific options
Assertions.assertThat(table1.options()).containsEntry("bucket", "10");
Assertions.assertThat(table1.options()).containsEntry("compaction.min.file-num", "5");
Assertions.assertThat(table1.options()).containsEntry("write-buffer-size", "256MB");

// Create second table
CreateTableEvent createTableEvent2 =
new CreateTableEvent(
TableId.parse("test.table2"),
org.apache.flink.cdc.common.schema.Schema.newBuilder()
.physicalColumn(
"col1",
org.apache.flink.cdc.common.types.DataTypes.STRING()
.notNull())
.physicalColumn(
"col2",
org.apache.flink.cdc.common.types.DataTypes.STRING())
.primaryKey("col1")
.build());
metadataApplier.applySchemaChange(createTableEvent2);
Table table2 = catalog.getTable(Identifier.fromString("test.table2"));

// Verify table2 has general table options and its specific options (different from table1)
Assertions.assertThat(table2.options()).containsEntry("bucket", "10");
Assertions.assertThat(table2.options()).containsEntry("compaction.min.file-num", "3");
Assertions.assertThat(table2.options()).containsEntry("write-buffer-size", "128MB");

// Create third table with no specific options
CreateTableEvent createTableEvent3 =
new CreateTableEvent(
TableId.parse("test.table3"),
org.apache.flink.cdc.common.schema.Schema.newBuilder()
.physicalColumn(
"col1",
org.apache.flink.cdc.common.types.DataTypes.STRING()
.notNull())
.physicalColumn(
"col2",
org.apache.flink.cdc.common.types.DataTypes.STRING())
.primaryKey("col1")
.build());
metadataApplier.applySchemaChange(createTableEvent3);
Table table3 = catalog.getTable(Identifier.fromString("test.table3"));

// Verify table3 only has general table options
Assertions.assertThat(table3.options()).containsEntry("bucket", "10");
Assertions.assertThat(table3.options()).doesNotContainKey("compaction.min.file-num");
Assertions.assertThat(table3.options()).doesNotContainKey("write-buffer-size");
}

@ParameterizedTest
@ValueSource(strings = {"filesystem", "hive"})
void testTableSpecificOptionsPrecedenceOverTableOptions(String metastore)
throws Catalog.TableNotExistException, Catalog.DatabaseNotEmptyException,
Catalog.DatabaseNotExistException, SchemaEvolveException {
initialize(metastore);
Map<String, String> tableOptions = new HashMap<>();
tableOptions.put("bucket", "10");
tableOptions.put(
"compaction.min.file-num", "8"); // This should be overridden by table-specific setting

Map<String, String> tableSpecificOptions = new HashMap<>();
// Table-specific option that conflicts with general table option
tableSpecificOptions.put("specific.table.properties.test.table1.compaction.min.file-num", "5");
tableSpecificOptions.put("specific.table.properties.test.table1.write-buffer-size", "256MB");

MetadataApplier metadataApplier =
new PaimonMetadataApplier(
catalogOptions, tableOptions, tableSpecificOptions, new HashMap<>());

CreateTableEvent createTableEvent =
new CreateTableEvent(
TableId.parse("test.table1"),
org.apache.flink.cdc.common.schema.Schema.newBuilder()
.physicalColumn(
"col1",
org.apache.flink.cdc.common.types.DataTypes.STRING()
.notNull())
.physicalColumn(
"col2",
org.apache.flink.cdc.common.types.DataTypes.STRING())
.primaryKey("col1")
.build());
metadataApplier.applySchemaChange(createTableEvent);
Table table = catalog.getTable(Identifier.fromString("test.table1"));

// Verify table-specific options take precedence over general table options
Assertions.assertThat(table.options()).containsEntry("bucket", "10");
Assertions.assertThat(table.options())
.containsEntry(
"compaction.min.file-num",
"5"); // Should be 5 (table-specific), not 8 (general table option)
Assertions.assertThat(table.options())
.containsEntry(
"write-buffer-size", "256MB"); // Should be from table-specific options
}
}
Loading