Description
If RLI is used to evaluate a time travel query, incorrect results will be returned. The issue can be reproduced through the following steps:
- Create a table and add three records into the table at three different time instants by running the script below three times with some time gap between each run.
// // Scala script for creating a table with RLI // import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.common.model.HoodieRecord val tableName = "hudi_trips_cow" val basePath = "file:///Users/amrish/tables/travel" val dataGen = new DataGenerator // Generate inserts val inserts = convertToStringList(dataGen.generateInserts(3)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1)) df.write.format("hudi"). | options(getQuickstartWriteConfigs). | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | option(TABLE_NAME, tableName). | option("hoodie.metadata.enable", "true"). | option("hoodie.metadata.record.index.enable", "true"). | option("hoodie.enable.data.skipping", "true"). | option("hoodie.index.type", "RECORD_INDEX"). | option("hoodie.metadata.secondary.record.index.enable", "true"). | option("hoodie.datasource.write.table.type", "COPY_ON_WRITE"). | option("hoodie.parquet.small.file.limit", "0"). | option("hoodie.compact.inline.max.delta.commits", "3"). | mode(Append). | save(basePath)
- Run select query, ordered by _hoodie_commit_time asc, to see the data in the table
// // Select query // val readOpts = Map( "hoodie.metadata.enable" -> "true", "hoodie.metadata.record.index.enable" -> "true", "hoodie.enable.data.skipping" -> "true", "hoodie.index.type" -> "RECORD_INDEX" ) val tripsSnapshotDF = spark. read. format("hudi"). options(readOpts). load(basePath) tripsSnapshotDF.createOrReplaceTempView("myrli") spark.sql("select * from myrli order by _hoodie_commit_time asc").show(false)
- The results of the select query run above should look something like the result shown below
+-------------------+---------------------+------------------------------------+------------------------------------+--------------------------------------------------------------------------+-------------------+-------------------+----------+--------------------+-------------------+------------------+---------+-------------+------------------------------------+------------------------------------+ |_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key |_hoodie_partition_path |_hoodie_file_name |begin_lat |begin_lon |driver |end_lat |end_lon |fare |rider |ts |uuid |partitionpath | +-------------------+---------------------+------------------------------------+------------------------------------+--------------------------------------------------------------------------+-------------------+-------------------+----------+--------------------+-------------------+------------------+---------+-------------+------------------------------------+------------------------------------+ |20230920163539097 |20230920163539097_0_0|6658c690-f28d-4874-86b5-709c7360e935|americas/brazil/sao_paulo |4ab59ebf-9274-44d6-bc29-9fc3112038d3-0_0-33-348_20230920163539097.parquet |0.4726905879569653 |0.46157858450465483|driver-213|0.754803407008858 |0.9671159942018241 |34.158284716382845|rider-213|1695016681714|6658c690-f28d-4874-86b5-709c7360e935|americas/brazil/sao_paulo | |20230920163539097 |20230920163539097_0_1|60eefced-6ced-41cb-9e7c-31787a8fdf2c|americas/brazil/sao_paulo |4ab59ebf-9274-44d6-bc29-9fc3112038d3-0_0-33-348_20230920163539097.parquet |0.6100070562136587 |0.8779402295427752 |driver-213|0.3407870505929602 |0.5030798142293655 |43.4923811219014 |rider-213|1694997420535|60eefced-6ced-41cb-9e7c-31787a8fdf2c|americas/brazil/sao_paulo | |20230920163539097 |20230920163539097_1_0|17b1ca71-f1a0-4341-97db-bfb269a8c747|americas/united_states/san_francisco|1347f595-c7f8-446b-bd49-344b514ea503-0_1-33-349_20230920163539097.parquet |0.5731835407930634 |0.4923479652912024 |driver-213|0.08988581780930216 |0.42520899698713666|64.27696295884016 |rider-213|1695175828360|17b1ca71-f1a0-4341-97db-bfb269a8c747|americas/united_states/san_francisco| |20230920164025402 |20230920164025402_0_1|8b4e6aca-c49a-43de-b150-419b9e75bc62|americas/brazil/sao_paulo |6b7bd997-983e-41e9-ae02-8896f13fd083-0_0-80-419_20230920164025402.parquet |0.1762368947074756 |0.7942627821413218 |driver-226|0.22400157419609057 |0.08079517477095832|87.42041526408588 |rider-226|1695061974993|8b4e6aca-c49a-43de-b150-419b9e75bc62|americas/brazil/sao_paulo | |20230920164025402 |20230920164025402_0_0|097e5763-e19f-4820-9c8e-808aba60e3ff|americas/brazil/sao_paulo |6b7bd997-983e-41e9-ae02-8896f13fd083-0_0-80-419_20230920164025402.parquet |0.36519521355305173|0.9888075495133515 |driver-226|0.013401540991535565|0.3794482769934313 |18.56488085068272 |rider-226|1695131524692|097e5763-e19f-4820-9c8e-808aba60e3ff|americas/brazil/sao_paulo | |20230920164025402 |20230920164025402_1_0|f4accc3e-061a-4c6c-a957-dc5e4ed38cf7|americas/united_states/san_francisco|cb40f7c0-6c4a-425a-80b2-159f92d30ec9-0_1-80-420_20230920164025402.parquet |0.6220454661413275 |0.72024792576853 |driver-226|0.9048755755365163 |0.727695054518325 |40.613510977307 |rider-226|1694672409324|f4accc3e-061a-4c6c-a957-dc5e4ed38cf7|americas/united_states/san_francisco| |20230920164221116 |20230920164221116_1_0|7db9e596-48cc-47b9-b21f-b5cb5bbeb381|asia/india/chennai |24ef5d9d-f132-435d-aacc-ecb2099504fe-0_1-118-493_20230920164221116.parquet|0.06224031095826987|0.4106290929046368 |driver-913|0.964603455586492 |0.13957566957654388|45.40019146422721 |rider-913|1694676094520|7db9e596-48cc-47b9-b21f-b5cb5bbeb381|asia/india/chennai | |20230920164221116 |20230920164221116_0_1|db25a621-20f0-4e96-a81a-07101d00d63e|americas/brazil/sao_paulo |c8e98b87-4b87-4200-a018-d6b596f13538-0_0-118-492_20230920164221116.parquet|0.25252652214479043|0.33922164839486424|driver-913|0.909372837469859 |0.9017656600243008 |82.36411667430927 |rider-913|1694663124830|db25a621-20f0-4e96-a81a-07101d00d63e|americas/brazil/sao_paulo | |20230920164221116 |20230920164221116_0_0|721a395e-eb8e-401c-bbaf-b8ab2ef7cf46|americas/brazil/sao_paulo |c8e98b87-4b87-4200-a018-d6b596f13538-0_0-118-492_20230920164221116.parquet|0.6346040067610669 |0.6662084366450246 |driver-913|0.9065078444936647 |0.7124299678100179 |5.336723040266267 |rider-913|1695098874341|721a395e-eb8e-401c-bbaf-b8ab2ef7cf46|americas/brazil/sao_paulo | +-------------------+---------------------+------------------------------------+------------------------------------+--------------------------------------------------------------------------+-------------------+-------------------+----------+--------------------+-------------------+------------------+---------+-------------+------------------------------------+------------------------------------+
- Note that each time instance (_hoodie_commit_time) has three records. Now delete one record from the middle time instance (20230920164025402). We use uuid to identify the record to delete as shown below
// // Delete one records (Hard Delete) // val dataset = spark.sql("select * from myrli where uuid='097e5763-e19f-4820-9c8e-808aba60e3ff'").limit(1) val deletes = dataGen.generateDeletes(dataset.collectAsList()) val deleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2)) deleteDf.write.format("hudi"). option(OPERATION_OPT_KEY, "delete"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)
- Run the select query from step (2) above again to confirm that the record was deleted.
- Now run the same select query, but with time travel option as shown below
// // Time travel query // val readOpts = Map( "hoodie.metadata.enable" -> "true", "hoodie.metadata.record.index.enable" -> "true", "hoodie.enable.data.skipping" -> "true", "hoodie.index.type" -> "RECORD_INDEX", "as.of.instant" -> "20230920164025402" ) val tripsSnapshotDF = spark. read. format("hudi"). options(readOpts). load(basePath) tripsSnapshotDF.createOrReplaceTempView("myrli") spark.sql("select * from myrli order by _hoodie_commit_time asc").show(false) spark.sql("select * from myrli WHERE uuid='097e5763-e19f-4820-9c8e-808aba60e3ff'").show(false)
- Note that the first query will show result containing six records including the record with uuid 097e5763-e19f-4820-9c8e-808aba60e3ff which we had deleted in step 4. This is perfectly ok since the result is generated, not at the latest time interval, but at time interval 20230920164025402. However, the second select query does not return any results, even though the second select query is also running over the dataset generated at time interval 20230920164025402. The second query returns incorrect results because it is using latest RLI to evaluate a query at a time instant in past.
The bug here is that we cannot use RLI to evaluate time stamp query at a past interval becuase RLI only maintains the latest state. As fix, we should avoid using RLI if the "as.of.instant" flag is set.