Skip to content

Commit b874f21

Browse files
authored
[spark] Fix the column projection when writing by DataFrameWriter (#5164)
1 parent 8044eb3 commit b874f21

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,13 @@ case class PaimonSparkWriter(table: FileStoreTable) {
399399
}
400400

401401
private def repartitionByPartitionsAndBucket(df: DataFrame): DataFrame = {
402-
val partitionCols = tableSchema.partitionKeys().asScala.map(col).toSeq
402+
val inputSchema = df.schema
403+
val partitionCols = tableSchema
404+
.partitionKeys()
405+
.asScala
406+
.map(tableSchema.fieldNames().indexOf(_))
407+
.map(x => col(inputSchema.fieldNames(x)))
408+
.toSeq
403409
df.repartition(partitionCols ++ Seq(col(BUCKET_COL)): _*)
404410
}
405411

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,26 @@ class DataFrameWriteTest extends PaimonSparkTestBase {
6161
Assertions.assertFalse(paimonTable.options().containsKey("write.merge-schema.explicit-cast"))
6262
}
6363

64+
test("Paimon: DataFrameWrite partition table") {
65+
withTable("t") {
66+
spark.sql(s"""
67+
|CREATE TABLE t (a INT, b STRING, dt STRING) PARTITIONED BY(dt)
68+
|TBLPROPERTIES ('file.format' = 'avro', 'bucket' = 2, 'bucket-key' = 'b')
69+
|""".stripMargin)
70+
71+
val table = loadTable("t")
72+
val location = table.location().toString
73+
74+
Seq((1, "x1", "a"), (2, "x2", "b"))
75+
.toDF("a", "b", "c")
76+
.write
77+
.format("paimon")
78+
.mode("append")
79+
.save(location)
80+
checkAnswer(sql("SELECT * FROM t"), Row(1, "x1", "a") :: Row(2, "x2", "b") :: Nil)
81+
}
82+
}
83+
6484
fileFormats.foreach {
6585
fileFormat =>
6686
test(s"Paimon: DataFrameWrite.saveAsTable in ByName mode, file.format: $fileFormat") {

0 commit comments

Comments
 (0)