Skip to content

Commit 67b2b6b

Browse files
committed
[Flink] debezium-bson format using the id field in the Kafka Key as Update before information
1 parent 6c81807 commit 67b2b6b

File tree

9 files changed

+255
-86
lines changed

9 files changed

+255
-86
lines changed

docs/content/cdc-ingestion/debezium-bson.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ bson-*.jar
4747
{{< hint info >}}
4848
The debezium bson format requires insert/update/delete event messages include the full document, and include a field that represents the state of the document before the change.
4949
This requires setting debezium's capture.mode to change_streams_update_full_with_pre_image and [capture.mode.full.update.type](https://debezium.io/documentation/reference/stable/connectors/mongodb.html#mongodb-property-capture-mode-full-update-type) to post_image.
50-
The database must be running **MongoDB 6.0 or later** to use this option.
50+
Before version 6.0 of MongoDB, it was not possible to obtain 'Update Before' information. Therefore, using the id field in the Kafka Key as 'Update before' information
5151
{{< /hint >}}
5252

5353
Here is a simple example for an update operation captured from a Mongodb customers collection in JSON format:
@@ -145,7 +145,7 @@ Below is a list of top-level field BsonValue conversion examples:
145145
<td>
146146
<ul>
147147
<li>1735934393769</li>
148-
<li>{"$numberLong": 1735934393769}</li>
148+
<li>{"$numberLong": "1735934393769"}</li>
149149
</ul>
150150
</td>
151151
<td>"1735934393769"</td>
@@ -186,7 +186,7 @@ Below is a list of top-level field BsonValue conversion examples:
186186
</tr>
187187
<tr>
188188
<td><h5>BsonArray</h5></td>
189-
<td>[1,2,{"$numberLong": 1735934393769}]</td>
189+
<td>[1,2,{"$numberLong": "1735934393769"}]</td>
190190
<td>"[1,2,1735934393769]"</td>
191191
</tr>
192192
<tr>

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131

3232
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
3333
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
34+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
35+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.TextNode;
3436

3537
import org.bson.BsonDocument;
3638
import org.bson.BsonValue;
@@ -46,13 +48,16 @@
4648
import java.util.Map;
4749
import java.util.Objects;
4850

51+
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_BEFORE;
4952
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_PAYLOAD;
5053
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_SCHEMA;
5154
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_TYPE;
5255
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_DELETE;
5356
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_INSERT;
5457
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_READE;
5558
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_UPDATE;
59+
import static org.apache.paimon.utils.JsonSerdeUtil.fromJson;
60+
import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
5661
import static org.apache.paimon.utils.JsonSerdeUtil.writeValueAsString;
5762

5863
/**
@@ -71,8 +76,11 @@ public class DebeziumBsonRecordParser extends DebeziumJsonRecordParser {
7176

7277
private static final String FIELD_COLLECTION = "collection";
7378
private static final String FIELD_OBJECT_ID = "_id";
79+
private static final String FIELD_KEY_ID = "id";
7480
private static final List<String> PRIMARY_KEYS = Collections.singletonList(FIELD_OBJECT_ID);
7581

82+
private ObjectNode keyRoot;
83+
7684
public DebeziumBsonRecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
7785
super(typeMapping, computedColumns);
7886
}
@@ -87,11 +95,11 @@ public List<RichCdcMultiplexRecord> extractRecords() {
8795
processRecord(getData(), RowKind.INSERT, records);
8896
break;
8997
case OP_UPDATE:
90-
processRecord(getBefore(operation), RowKind.DELETE, records);
98+
processDeleteRecord(operation, records);
9199
processRecord(getData(), RowKind.INSERT, records);
92100
break;
93101
case OP_DELETE:
94-
processRecord(getBefore(operation), RowKind.DELETE, records);
102+
processDeleteRecord(operation, records);
95103
break;
96104
default:
97105
throw new UnsupportedOperationException("Unknown record operation: " + operation);
@@ -101,11 +109,14 @@ public List<RichCdcMultiplexRecord> extractRecords() {
101109

102110
@Override
103111
protected void setRoot(CdcSourceRecord record) {
104-
JsonNode node = (JsonNode) record.getValue();
105-
if (node.has(FIELD_SCHEMA)) {
106-
root = node.get(FIELD_PAYLOAD);
107-
} else {
108-
root = node;
112+
root = (JsonNode) record.getValue();
113+
if (root.has(FIELD_SCHEMA)) {
114+
root = root.get(FIELD_PAYLOAD);
115+
}
116+
117+
keyRoot = (ObjectNode) record.getKey();
118+
if (!isNull(keyRoot) && keyRoot.has(FIELD_SCHEMA)) {
119+
keyRoot = (ObjectNode) keyRoot.get(FIELD_PAYLOAD);
109120
}
110121
}
111122

@@ -158,4 +169,37 @@ protected String getTableName() {
158169
protected String format() {
159170
return "debezium-bson";
160171
}
172+
173+
public boolean checkBeforeExists() {
174+
return !isNull(root.get(FIELD_BEFORE));
175+
}
176+
177+
private void processDeleteRecord(String operation, List<RichCdcMultiplexRecord> records) {
178+
if (checkBeforeExists()) {
179+
processRecord(getBefore(operation), RowKind.DELETE, records);
180+
} else {
181+
// Before version 6.0 of MongoDB, it was not possible to obtain 'Update Before'
182+
// information. Therefore, data is first deleted using the key 'id'
183+
JsonNode idNode = null;
184+
Preconditions.checkArgument(
185+
!isNull(keyRoot) && !isNull(idNode = keyRoot.get(FIELD_KEY_ID)),
186+
"Invalid %s format: missing '%s' field in key when '%s' is '%s' for: %s.",
187+
format(),
188+
FIELD_KEY_ID,
189+
FIELD_TYPE,
190+
operation,
191+
keyRoot);
192+
193+
// Deserialize id from json string to JsonNode
194+
Map<String, JsonNode> record =
195+
Collections.singletonMap(
196+
FIELD_OBJECT_ID, fromJson(idNode.asText(), JsonNode.class));
197+
198+
try {
199+
processRecord(new TextNode(writeValueAsString(record)), RowKind.DELETE, records);
200+
} catch (JsonProcessingException e) {
201+
throw new RuntimeException("Failed to deserialize key record.", e);
202+
}
203+
}
204+
}
161205
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,14 @@ public CdcSourceRecord deserialize(ConsumerRecord<byte[], byte[]> message) throw
6464
}
6565

6666
try {
67-
return new CdcSourceRecord(objectMapper.readValue(message.value(), JsonNode.class));
67+
byte[] key = message.key();
68+
JsonNode keyNode = null;
69+
if (key != null) {
70+
keyNode = objectMapper.readValue(key, JsonNode.class);
71+
}
72+
73+
JsonNode valueNode = objectMapper.readValue(message.value(), JsonNode.class);
74+
return new CdcSourceRecord(null, keyNode, valueNode);
6875
} catch (Exception e) {
6976
LOG.error("Invalid Json:\n{}", new String(message.value()));
7077
throw e;

0 commit comments

Comments
 (0)