Skip to content

Commit f32f02d

Browse files
authored
[spark] Support MERGE for row lineage (#6045)
1 parent a70a650 commit f32f02d

File tree

4 files changed

+151
-46
lines changed

4 files changed

+151
-46
lines changed

docs/content/append-table/row-tracking.md

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ WITH ('row-tracking.enabled' = 'true');
4646
```
4747
Notice that:
4848
- Row tracking is only supported for unaware append tables, not for primary key tables. Which means you can't define `bucket` and `bucket-key` for the table.
49-
- Only spark support update and merge into (not yet) operations on row-tracking tables, Flink SQL does not support these operations yet.
49+
- Only spark support update and merge into operations on row-tracking tables, Flink SQL does not support these operations yet.
5050
- This function is experimental, this line will be removed after being stable.
5151

5252
## How to use row tracking
@@ -62,7 +62,7 @@ You can select the row lineage meta column with the following sql in spark:
6262
SELECT id, data, _ROW_ID, _SEQUENCE_NUMBER FROM t;
6363
```
6464
You will get the following result:
65-
```sql
65+
```text
6666
+---+----+-------+----------------+
6767
| id|data|_ROW_ID|_SEQUENCE_NUMBER|
6868
+---+----+-------+----------------+
@@ -78,13 +78,32 @@ SELECT id, data, _ROW_ID, _SEQUENCE_NUMBER FROM t;
7878
```
7979

8080
You will get:
81-
```sql
81+
```text
8282
+---+---------------+-------+----------------+
8383
| id| data|_ROW_ID|_SEQUENCE_NUMBER|
8484
+---+---------------+-------+----------------+
8585
| 22| b| 1| 1|
8686
| 11|new-data-update| 0| 2|
87-
+---+---------------+-------+----------------+
87+
+---+---------------+-------+----------------+
88+
```
89+
90+
You can also merge into the table, suppose you have a source table `s` that contains (22, 'new-data-merge') and (33, 'c'):
91+
```sql
92+
MERGE INTO t USING s
93+
ON t.id = s.id
94+
WHEN MATCHED THEN UPDATE SET t.data = s.data
95+
WHEN NOT MATCHED THEN INSERT *
96+
```
97+
98+
You will get:
99+
```text
100+
+---+---------------+-------+----------------+
101+
| id| data|_ROW_ID|_SEQUENCE_NUMBER|
102+
+---+---------------+-------+----------------+
103+
| 11|new-data-update| 0| 2|
104+
| 22| new-data-merge| 1| 3|
105+
| 33| c| 2| 3|
106+
+---+---------------+-------+----------------+
88107
```
89108

90109
## Spec

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala

Lines changed: 68 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
2424
import org.apache.paimon.spark.schema.{PaimonMetadataColumn, SparkSystemColumns}
2525
import org.apache.paimon.spark.schema.PaimonMetadataColumn.{FILE_PATH, FILE_PATH_COLUMN, ROW_INDEX, ROW_INDEX_COLUMN}
2626
import org.apache.paimon.spark.util.{EncoderUtils, SparkRowUtils}
27-
import org.apache.paimon.table.FileStoreTable
27+
import org.apache.paimon.table.{FileStoreTable, SpecialFields}
2828
import org.apache.paimon.table.sink.CommitMessage
2929
import org.apache.paimon.types.RowKind
3030

@@ -114,14 +114,10 @@ case class MergeIntoPaimonTable(
114114
sparkSession,
115115
filteredRelation,
116116
remainDeletedRow = true,
117-
metadataCols = metadataCols)
117+
extraMetadataCols = metadataCols)
118118

119119
ds.cache()
120120
try {
121-
val rowKindAttribute = ds.queryExecution.analyzed.output
122-
.find(attr => sparkSession.sessionState.conf.resolver(attr.name, ROW_KIND_COL))
123-
.getOrElse(throw new RuntimeException("Can not find _row_kind_ column."))
124-
125121
// Step3: filter rows that should be marked as DELETED in Deletion Vector mode.
126122
val dvDS = ds.where(
127123
s"$ROW_KIND_COL = ${RowKind.DELETE.toByteValue} or $ROW_KIND_COL = ${RowKind.UPDATE_AFTER.toByteValue}")
@@ -141,8 +137,10 @@ case class MergeIntoPaimonTable(
141137
ds.unpersist()
142138
}
143139
} else {
144-
val touchedFilePathsSet = mutable.Set.empty[String]
145-
val intersectionFilePaths = mutable.Set.empty[String]
140+
// Files need to be rewritten
141+
val filePathsToRewritten = mutable.Set.empty[String]
142+
// Files need to be read, but not rewritten
143+
val filePathsToRead = mutable.Set.empty[String]
146144

147145
def hasUpdate(actions: Seq[MergeAction]): Boolean = {
148146
actions.exists {
@@ -159,39 +157,44 @@ case class MergeIntoPaimonTable(
159157
}
160158

161159
if (hasUpdate(matchedActions)) {
162-
touchedFilePathsSet ++= findTouchedFiles0("inner")
160+
filePathsToRewritten ++= findTouchedFiles0("inner")
163161
} else if (notMatchedActions.nonEmpty) {
164-
intersectionFilePaths ++= findTouchedFiles0("inner")
162+
filePathsToRead ++= findTouchedFiles0("inner")
165163
}
166164

167165
if (hasUpdate(notMatchedBySourceActions)) {
168-
touchedFilePathsSet ++= findTouchedFiles0("left_anti")
169-
}
170-
171-
val touchedFilePaths: Array[String] = touchedFilePathsSet.toArray
172-
val unTouchedFilePaths = if (notMatchedActions.nonEmpty) {
173-
intersectionFilePaths.diff(touchedFilePathsSet).toArray
174-
} else {
175-
Array[String]()
166+
val noMatchedBySourceFilePaths = findTouchedFiles0("left_anti")
167+
filePathsToRewritten ++= noMatchedBySourceFilePaths
168+
filePathsToRead --= noMatchedBySourceFilePaths
176169
}
177170

178-
val (touchedFiles, touchedFileRelation) =
179-
createNewRelation(touchedFilePaths, dataFilePathToMeta, relation)
171+
val (filesToRewritten, touchedFileRelation) =
172+
createNewRelation(filePathsToRewritten.toArray, dataFilePathToMeta, relation)
180173
val (_, unTouchedFileRelation) =
181-
createNewRelation(unTouchedFilePaths, dataFilePathToMeta, relation)
174+
createNewRelation(filePathsToRead.toArray, dataFilePathToMeta, relation)
182175

183176
// Add FILE_TOUCHED_COL to mark the row as coming from the touched file, if the row has not been
184177
// modified and was from touched file, it should be kept too.
185-
val touchedDsWithFileTouchedCol = createDataset(sparkSession, touchedFileRelation)
178+
val targetDSWithFileTouchedCol = createDataset(sparkSession, touchedFileRelation)
186179
.withColumn(FILE_TOUCHED_COL, lit(true))
187-
val targetDSWithFileTouchedCol = touchedDsWithFileTouchedCol.union(
188-
createDataset(sparkSession, unTouchedFileRelation)
180+
.union(createDataset(sparkSession, unTouchedFileRelation)
189181
.withColumn(FILE_TOUCHED_COL, lit(false)))
190182

191-
val toWriteDS =
192-
constructChangedRows(sparkSession, targetDSWithFileTouchedCol).drop(ROW_KIND_COL)
193-
val addCommitMessage = dvSafeWriter.write(toWriteDS)
194-
val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
183+
// If no files need to be rewritten, no need to write row lineage
184+
val writeRowLineage = coreOptions.rowTrackingEnabled() && filesToRewritten.nonEmpty
185+
186+
val toWriteDS = constructChangedRows(
187+
sparkSession,
188+
targetDSWithFileTouchedCol,
189+
writeRowLineage = writeRowLineage).drop(ROW_KIND_COL)
190+
191+
val writer = if (writeRowLineage) {
192+
dvSafeWriter.withRowLineage()
193+
} else {
194+
dvSafeWriter
195+
}
196+
val addCommitMessage = writer.write(toWriteDS)
197+
val deletedCommitMessage = buildDeletedCommitMessage(filesToRewritten)
195198

196199
addCommitMessage ++ deletedCommitMessage
197200
}
@@ -203,7 +206,8 @@ case class MergeIntoPaimonTable(
203206
targetDataset: Dataset[Row],
204207
remainDeletedRow: Boolean = false,
205208
deletionVectorEnabled: Boolean = false,
206-
metadataCols: Seq[PaimonMetadataColumn] = Seq.empty): Dataset[Row] = {
209+
extraMetadataCols: Seq[PaimonMetadataColumn] = Seq.empty,
210+
writeRowLineage: Boolean = false): Dataset[Row] = {
207211
val targetDS = targetDataset
208212
.withColumn(TARGET_ROW_COL, lit(true))
209213

@@ -217,25 +221,42 @@ case class MergeIntoPaimonTable(
217221
resolveExpressions(sparkSession)(exprs, joinedPlan)
218222
}
219223

220-
val targetOutput = filteredTargetPlan.output
221224
val targetRowNotMatched = resolveOnJoinedPlan(
222225
Seq(toExpression(sparkSession, col(SOURCE_ROW_COL).isNull))).head
223226
val sourceRowNotMatched = resolveOnJoinedPlan(
224227
Seq(toExpression(sparkSession, col(TARGET_ROW_COL).isNull))).head
225228
val matchedExprs = matchedActions.map(_.condition.getOrElse(TrueLiteral))
226229
val notMatchedExprs = notMatchedActions.map(_.condition.getOrElse(TrueLiteral))
227230
val notMatchedBySourceExprs = notMatchedBySourceActions.map(_.condition.getOrElse(TrueLiteral))
228-
val noopOutput = targetOutput :+ Alias(Literal(NOOP_ROW_KIND_VALUE), ROW_KIND_COL)()
229-
val keepOutput = targetOutput :+ Alias(Literal(RowKind.INSERT.toByteValue), ROW_KIND_COL)()
230231

231232
val resolver = sparkSession.sessionState.conf.resolver
232-
val metadataAttributes = metadataCols.flatMap {
233-
metadataCol => joinedPlan.output.find(attr => resolver(metadataCol.name, attr.name))
233+
def attribute(name: String) = joinedPlan.output.find(attr => resolver(name, attr.name))
234+
val extraMetadataAttributes =
235+
extraMetadataCols.flatMap(metadataCol => attribute(metadataCol.name))
236+
val (rowIdAttr, sequenceNumberAttr) = if (writeRowLineage) {
237+
(
238+
attribute(SpecialFields.ROW_ID.name()).get,
239+
attribute(SpecialFields.SEQUENCE_NUMBER.name()).get)
240+
} else {
241+
(null, null)
242+
}
243+
244+
val targetOutput = if (writeRowLineage) {
245+
filteredTargetPlan.output ++ Seq(rowIdAttr, sequenceNumberAttr)
246+
} else {
247+
filteredTargetPlan.output
234248
}
249+
val noopOutput = targetOutput :+ Alias(Literal(NOOP_ROW_KIND_VALUE), ROW_KIND_COL)()
250+
val keepOutput = targetOutput :+ Alias(Literal(RowKind.INSERT.toByteValue), ROW_KIND_COL)()
251+
235252
def processMergeActions(actions: Seq[MergeAction]): Seq[Seq[Expression]] = {
236253
val columnExprs = actions.map {
237254
case UpdateAction(_, assignments) =>
238-
assignments.map(_.value) :+ Literal(RowKind.UPDATE_AFTER.toByteValue)
255+
var exprs = assignments.map(_.value)
256+
if (writeRowLineage) {
257+
exprs ++= Seq(rowIdAttr, Literal(null))
258+
}
259+
exprs :+ Literal(RowKind.UPDATE_AFTER.toByteValue)
239260
case DeleteAction(_) =>
240261
if (remainDeletedRow || deletionVectorEnabled) {
241262
targetOutput :+ Literal(RowKind.DELETE.toByteValue)
@@ -245,17 +266,26 @@ case class MergeIntoPaimonTable(
245266
noopOutput
246267
}
247268
case InsertAction(_, assignments) =>
248-
assignments.map(_.value) :+ Literal(RowKind.INSERT.toByteValue)
269+
var exprs = assignments.map(_.value)
270+
if (writeRowLineage) {
271+
exprs ++= Seq(rowIdAttr, sequenceNumberAttr)
272+
}
273+
exprs :+ Literal(RowKind.INSERT.toByteValue)
249274
}
250-
columnExprs.map(exprs => exprs ++ metadataAttributes)
275+
276+
columnExprs.map(exprs => exprs ++ extraMetadataAttributes)
251277
}
252278

253279
val matchedOutputs = processMergeActions(matchedActions)
254280
val notMatchedBySourceOutputs = processMergeActions(notMatchedBySourceActions)
255281
val notMatchedOutputs = processMergeActions(notMatchedActions)
256282
val outputFields = mutable.ArrayBuffer(tableSchema.fields: _*)
283+
if (writeRowLineage) {
284+
outputFields += PaimonMetadataColumn.ROW_ID.toStructField
285+
outputFields += PaimonMetadataColumn.SEQUENCE_NUMBER.toStructField
286+
}
257287
outputFields += StructField(ROW_KIND_COL, ByteType)
258-
outputFields ++= metadataCols.map(_.toStructField)
288+
outputFields ++= extraMetadataCols.map(_.toStructField)
259289
val outputSchema = StructType(outputFields.toSeq)
260290

261291
val joinedRowEncoder = EncoderUtils.encode(joinedPlan.schema)

paimon-spark/paimon-spark-ut/pom.xml

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,19 @@ under the License.
4040
<dependencies>
4141

4242
<dependency>
43-
<groupId>org.codehaus.janino</groupId>
44-
<artifactId>janino</artifactId>
45-
<version>${janino.test.version}</version>
43+
<groupId>org.apache.paimon</groupId>
44+
<artifactId>paimon-common</artifactId>
45+
<version>${project.version}</version>
4646
<scope>test</scope>
4747
</dependency>
48+
4849
<dependency>
4950
<groupId>org.apache.paimon</groupId>
50-
<artifactId>paimon-common</artifactId>
51+
<artifactId>paimon-core</artifactId>
5152
<version>${project.version}</version>
5253
<scope>test</scope>
5354
</dependency>
55+
5456
<dependency>
5557
<groupId>org.apache.paimon</groupId>
5658
<artifactId>${paimon-sparkx-common}</artifactId>
@@ -218,6 +220,13 @@ under the License.
218220
</exclusions>
219221
<scope>test</scope>
220222
</dependency>
223+
224+
<dependency>
225+
<groupId>org.codehaus.janino</groupId>
226+
<artifactId>janino</artifactId>
227+
<version>${janino.test.version}</version>
228+
<scope>test</scope>
229+
</dependency>
221230
</dependencies>
222231

223232
<build>

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,51 @@ abstract class RowLineageTestBase extends PaimonSparkTestBase {
8181
)
8282
}
8383
}
84+
85+
test("Row Lineage: merge into table") {
86+
withTable("s", "t") {
87+
sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')")
88+
sql("INSERT INTO s VALUES (1, 11), (2, 22)")
89+
90+
sql("CREATE TABLE t (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')")
91+
sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b FROM range(2, 4)")
92+
checkAnswer(
93+
sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
94+
Seq(Row(2, 2, 0, 1), Row(3, 3, 1, 1))
95+
)
96+
97+
sql("""
98+
|MERGE INTO t
99+
|USING s
100+
|ON t.id = s.id
101+
|WHEN MATCHED THEN UPDATE SET t.b = s.b
102+
|WHEN NOT MATCHED THEN INSERT *
103+
|""".stripMargin)
104+
checkAnswer(
105+
sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
106+
Seq(Row(1, 11, 2, 2), Row(2, 22, 0, 2), Row(3, 3, 1, 1))
107+
)
108+
}
109+
}
110+
111+
test("Row Lineage: merge into table with only insert") {
112+
withTable("s", "t") {
113+
sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')")
114+
sql("INSERT INTO s VALUES (1, 11), (2, 22)")
115+
116+
sql("CREATE TABLE t (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')")
117+
sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b FROM range(2, 4)")
118+
119+
sql("""
120+
|MERGE INTO t
121+
|USING s
122+
|ON t.id = s.id
123+
|WHEN NOT MATCHED THEN INSERT *
124+
|""".stripMargin)
125+
checkAnswer(
126+
sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
127+
Seq(Row(1, 11, 2, 2), Row(2, 2, 0, 1), Row(3, 3, 1, 1))
128+
)
129+
}
130+
}
84131
}

0 commit comments

Comments
 (0)