Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
0.8.0
-
hadoop 3.1.1
spark3.1.1/spark2.4.5
hive3.1.1
Description
When log exists in mor table, and clustering is triggered. The query result shows that the update record of log is lost。
the reason of this problem is that: hoodie use HoodieFileSliceReader to read table data and then do clustering. HoodieFileSliceReader call HoodieMergedLogRecordScanner.
processNextRecord to merge update values and old valuse, when call that function old values is reserved update values is discarded, this is wrong。
test step:
// step1 : create hudi mor table
val df = spark.range(0, 1000).toDF("keyid")
.withColumn("col3", expr("keyid"))
.withColumn("age", lit(1))
.withColumn("p", lit(2))
df.write.format("hudi").
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
option(PRECOMBINE_FIELD_OPT_KEY, "col3").
option(RECORDKEY_FIELD_OPT_KEY, "keyid").
option(PARTITIONPATH_FIELD_OPT_KEY, "p").
option(DataSourceWriteOptions.OPERATION_OPT_KEY, "insert").
option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
option("hoodie.insert.shuffle.parallelism", "4").
option("hoodie.upsert.shuffle.parallelism", "4").
option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.mode(SaveMode.Overwrite).save(basePath)
// step2, update age where keyid < 5 to produce log files
val df1 = spark.range(0, 5).toDF("keyid")
.withColumn("col3", expr("keyid"))
.withColumn("age", lit(1 + 1000))
.withColumn("p", lit(2))
df1.write.format("hudi").
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
option(PRECOMBINE_FIELD_OPT_KEY, "col3").
option(RECORDKEY_FIELD_OPT_KEY, "keyid").
option(PARTITIONPATH_FIELD_OPT_KEY, "p").
option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert").
option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
option("hoodie.insert.shuffle.parallelism", "4").
option("hoodie.upsert.shuffle.parallelism", "4").
option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.mode(SaveMode.Append).save(basePath)
// step3, do cluster inline
val df2 = spark.range(6, 10).toDF("keyid")
.withColumn("col3", expr("keyid"))
.withColumn("age", lit(1 + 2000))
.withColumn("p", lit(2))
df2.write.format("hudi").
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
option(PRECOMBINE_FIELD_OPT_KEY, "col3").
option(RECORDKEY_FIELD_OPT_KEY, "keyid").
option(PARTITIONPATH_FIELD_OPT_KEY, "p").
option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert").
option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
option("hoodie.insert.shuffle.parallelism", "4").
option("hoodie.upsert.shuffle.parallelism", "4").
option("hoodie.parquet.small.file.limit", "0").
option("hoodie.clustering.inline", "true").
option("hoodie.clustering.inline.max.commits", "1").
option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").
option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").
option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString)
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.mode(SaveMode.Append).save(basePath)
spark.read.format("hudi")
.load(basePath).select("age").where("keyid = 0").show(100, false)
---
age |
---
1 |
—
the result is wrong, since we update the value of age to 1001 at step 2.
Attachments
Issue Links
- links to