Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
None
-
On AWS EMR
Description
Code example to reproduce.
import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.spark.sql.SaveMode val df = Seq( ("100", "event_name_900", "2015-01-01T13:51:39.340396Z", "type1"), ("101", "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"), ("104", "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"), ("105", "event_name_678", "2015-01-01T13:51:42.248818Z", "type2") ).toDF("event_id", "event_name", "event_ts", "event_type") var tableName = "hudi_events_mor_1" var tablePath = "s3://emr-users/wenningd/hudi/tables/events/" + tableName // write hudi dataset df.write.format("org.apache.hudi") .option(HoodieWriteConfig.TABLE_NAME, tableName) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts") .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName) .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type") .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false") .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor") .mode(SaveMode.Overwrite) .save(tablePath) // update a record with event_name "event_name_123" => "event_name_changed" val df1 = spark.read.format("org.apache.hudi").load(tablePath + "/*/*") val df2 = df1.filter($"event_id" === "104") val df3 = df2.withColumn("event_name", lit("event_name_changed")) // update hudi dataset df3.write.format("org.apache.hudi") .option(HoodieWriteConfig.TABLE_NAME, tableName) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts") .option("hoodie.compact.inline", "false") .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName) .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type") .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false") .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor") .mode(SaveMode.Append) .save(tablePath)
Now when querying the real-time table from Hive, we have no issue seeing the updated value:
hive> select event_name from hudi_events_mor_1_rt; OK event_name_900 event_name_changed event_name_546 event_name_678 Time taken: 0.103 seconds, Fetched: 4 row(s)
But when querying the real-time table from Presto, we only read the base parquet file and do not see the update that should be merged in from the log file.
presto:default> select event_name from hudi_events_mor_1_rt;
event_name
----------------
event_name_900
event_name_123
event_name_546
event_name_678
(4 rows)
Our current understanding of this issue is that while the HoodieParquetRealtimeInputFormat correctly generates the splits. The RealtimeCompactedRecordReader record reader is not used so it is not reading the log file and only reading the base parquet file.
Attachments
Issue Links
- is blocked by
-
HUDI-907 Test Presto mor query support changes in HDFS Env
- Open
- links to