Skip to content

[FLINK-38206][pipeline-connector][paimon] Support writing to existed table with inconsistent schema with upstream. #4081

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -125,4 +126,13 @@ public org.apache.flink.table.api.DataTypes.Field toFlinkDataTypeField() {
: org.apache.flink.table.api.DataTypes.FIELD(
name, DataTypeUtils.toFlinkDataType(type), description);
}

public static DataField fromFlinkDataTypeField(
org.apache.flink.table.types.logical.RowType.RowField rowField) {
return DataTypes.FIELD(
rowField.getName(),
DataTypeUtils.fromFlinkDataType(
LogicalTypeDataTypeConverter.toDataType(rowField.getType())),
rowField.getDescription().orElse(null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,16 @@
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.CollectionUtil;

import java.util.List;
import java.util.stream.Collectors;

import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getLength;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;

/** Utilities for handling {@link DataType}s. */
public class DataTypeUtils {
/**
Expand Down Expand Up @@ -197,4 +202,101 @@ public static org.apache.flink.table.types.DataType toFlinkDataType(DataType typ
throw new IllegalArgumentException("Illegal type: " + type);
}
}

/**
* Convert Flink's internal {@link org.apache.flink.table.types.DataType} to CDC's {@link
* DataType}.
*/
public static DataType fromFlinkDataType(org.apache.flink.table.types.DataType flinkType) {
LogicalType logicalType = flinkType.getLogicalType();
List<org.apache.flink.table.types.DataType> children = flinkType.getChildren();
DataType dataType;
switch (logicalType.getTypeRoot()) {
case CHAR:
dataType = DataTypes.CHAR(getLength(logicalType));
break;
case VARCHAR:
dataType = DataTypes.VARCHAR(getLength(logicalType));
break;
case BOOLEAN:
dataType = DataTypes.BOOLEAN();
break;
case BINARY:
dataType = DataTypes.BINARY(getLength(logicalType));
break;
case VARBINARY:
dataType = DataTypes.VARBINARY(getLength(logicalType));
break;
case DECIMAL:
dataType = DataTypes.DECIMAL(getPrecision(logicalType), getScale(logicalType));
break;
case TINYINT:
dataType = DataTypes.TINYINT();
break;
case SMALLINT:
dataType = DataTypes.SMALLINT();
break;
case INTEGER:
dataType = DataTypes.INT();
break;
case BIGINT:
dataType = DataTypes.BIGINT();
break;
case FLOAT:
dataType = DataTypes.FLOAT();
break;
case DOUBLE:
dataType = DataTypes.DOUBLE();
break;
case DATE:
dataType = DataTypes.DATE();
break;
case TIME_WITHOUT_TIME_ZONE:
dataType = DataTypes.TIME(getPrecision(logicalType));
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
dataType = DataTypes.TIMESTAMP(getPrecision(logicalType));
break;
case TIMESTAMP_WITH_TIME_ZONE:
dataType = DataTypes.TIMESTAMP_TZ(getPrecision(logicalType));
break;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
dataType = DataTypes.TIMESTAMP_LTZ(getPrecision(logicalType));
break;
case ARRAY:
Preconditions.checkState(children != null && !children.isEmpty());
dataType = DataTypes.ARRAY(fromFlinkDataType(children.get(0)));
break;
case MAP:
Preconditions.checkState(children != null && children.size() > 1);
dataType =
DataTypes.MAP(
fromFlinkDataType(children.get(0)),
fromFlinkDataType(children.get(1)));
break;
case ROW:
Preconditions.checkState(!CollectionUtil.isNullOrEmpty(children));
org.apache.flink.table.types.logical.RowType rowType =
(org.apache.flink.table.types.logical.RowType) flinkType.getLogicalType();
DataField[] fields =
rowType.getFields().stream()
.map(DataField::fromFlinkDataTypeField)
.toArray(DataField[]::new);
dataType = DataTypes.ROW(fields);
break;
case INTERVAL_YEAR_MONTH:
case INTERVAL_DAY_TIME:
case NULL:
case MULTISET:
case DISTINCT_TYPE:
case STRUCTURED_TYPE:
case RAW:
case SYMBOL:
case UNRESOLVED:
throw new IllegalArgumentException("Unsupported type: " + flinkType);
default:
throw new IllegalArgumentException("Illegal type: " + flinkType);
}
return logicalType.isNullable() ? dataType : dataType.notNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,9 @@ private void applyDropColumn(DropColumnEvent event) throws SchemaEvolveException
event.getDroppedColumnNames()
.forEach((column) -> tableChangeList.addAll(SchemaChangeProvider.drop(column)));
catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true);
} catch (Catalog.TableNotExistException
| Catalog.ColumnAlreadyExistException
| Catalog.ColumnNotExistException e) {
} catch (Catalog.TableNotExistException | Catalog.ColumnNotExistException e) {
LOG.warn("Failed to apply DropColumnEvent, skip it.", e);
} catch (Catalog.ColumnAlreadyExistException e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,42 @@
import org.apache.flink.cdc.common.data.binary.BinaryMapData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeChecks;
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.table.types.utils.TypeConversions;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.memory.MemorySegmentUtils;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.common.types.DataTypeChecks.getFieldCount;

/** A helper class for {@link PaimonWriter} to create FieldGetter and GenericRow. */
/**
* A helper class to deduce Schema of paimon table for {@link BucketAssignOperator}, and create
* FieldGetter and GenericRow for {@link PaimonWriter}.
*/
public class PaimonWriterHelper {

/** create a list of {@link RecordData.FieldGetter} for {@link PaimonWriter}. */
Expand All @@ -61,6 +73,31 @@ public static List<RecordData.FieldGetter> createFieldGetters(Schema schema, Zon
return fieldGetters;
}

/**
* Check if the columns of upstream schema is the same as the physical schema.
*
* <p>Note: Default value of column was ignored as it has no influence in {@link
* #createFieldGetter(DataType, int, ZoneId)}.
*/
public static Boolean sameColumnsIgnoreCommentAndDefaultValue(
Schema upstreamSchema, Schema physicalSchema) {
List<Column> upstreamColumns = upstreamSchema.getColumns();
List<Column> physicalColumns = physicalSchema.getColumns();
if (upstreamColumns.size() != physicalColumns.size()) {
return false;
}
for (int i = 0; i < physicalColumns.size(); i++) {
Column upstreamColumn = upstreamColumns.get(i);
Column physicalColumn = physicalColumns.get(i);
// Case sensitive.
if (!upstreamColumn.getName().equals(physicalColumn.getName())
|| !upstreamColumn.getType().equals(physicalColumn.getType())) {
return false;
}
}
return true;
}

private static RecordData.FieldGetter createFieldGetter(
DataType fieldType, int fieldPos, ZoneId zoneId) {
final RecordData.FieldGetter fieldGetter;
Expand Down Expand Up @@ -215,6 +252,32 @@ public static List<GenericRow> convertEventToFullGenericRows(
return fullGenericRows;
}

/**
* Deduce {@link Schema} for a {@link Table}.
*
* <p>Note: default value was not included in the result.
*/
public static Schema deduceSchemaForPaimonTable(Table table) {
RowType rowType = table.rowType();
Schema.Builder builder = Schema.newBuilder();
builder.setColumns(
rowType.getFields().stream()
.map(
column ->
Column.physicalColumn(
column.name(),
DataTypeUtils.fromFlinkDataType(
TypeConversions.fromLogicalToDataType(
LogicalTypeConversion.toLogicalType(
column.type()))),
column.description()))
.collect(Collectors.toList()));
builder.primaryKey(table.primaryKeys());
table.comment().ifPresent(builder::comment);
builder.options(table.options());
return builder.build();
}

private static GenericRow convertRecordDataToGenericRow(
RecordData recordData, List<RecordData.FieldGetter> fieldGetters, RowKind rowKind) {
GenericRow genericRow = new GenericRow(rowKind, recordData.getArity());
Expand Down Expand Up @@ -340,4 +403,8 @@ public InternalRow readRowData(
return row;
}
}

public static Identifier identifierFromTableId(TableId tableId) {
return Identifier.fromString(tableId.identifier());
}
}
Loading
Loading