Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
0.8.0
-
hadoop 3.1.1
spark3.1.1
hive 3.1.1
Description
when incremental query contains multiple commit before and after replacecommit, and the query result contains the data of the old file. Notice: mor table is ok, only cow table has this problem.
when query incr_view for cow table, replacecommit is ignored which lead the wrong result.
test step:
step1: create dataFrame
val df = spark.range(0, 10).toDF("keyid")
.withColumn("col3", expr("keyid"))
.withColumn("age", lit(1))
.withColumn("p", lit(2))
step2: insert df to a empty hoodie table
df.write.format("hudi").
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "col3").
option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "keyid").
option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "").
option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option(DataSourceWriteOptions.OPERATION_OPT_KEY, "insert").
option("hoodie.insert.shuffle.parallelism", "4").
option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.mode(SaveMode.Overwrite).save(basePath)
step3: do insert_overwrite
df.write.format("hudi").
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "col3").
option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "keyid").
option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "").
option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option(DataSourceWriteOptions.OPERATION_OPT_KEY, "insert_overwrite_table").
option("hoodie.insert.shuffle.parallelism", "4").
option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.mode(SaveMode.Append).save(basePath)
step4: query incrematal table
spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "0000")
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, currentCommits(0))
.load(basePath).select("keyid").orderBy("keyid").show(100, false)
result: the result contains old data
-----
keyid |
-----
0 |
0 |
1 |
1 |
2 |
2 |
3 |
3 |
4 |
4 |
5 |
5 |
6 |
6 |
7 |
7 |
8 |
8 |
9 |
9 |
-----