Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-305

Presto MOR "_rt" queries only reads base parquet file

    XMLWordPrintableJSON

Details

    Description

      Code example to reproduce.

      import org.apache.hudi.DataSourceWriteOptions
      import org.apache.hudi.config.HoodieWriteConfig
      import org.apache.spark.sql.SaveMode
      
      val df = Seq(
        ("100", "event_name_900", "2015-01-01T13:51:39.340396Z", "type1"),
        ("101", "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
        ("104", "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
        ("105", "event_name_678", "2015-01-01T13:51:42.248818Z", "type2")
        ).toDF("event_id", "event_name", "event_ts", "event_type")
      
      var tableName = "hudi_events_mor_1"
      var tablePath = "s3://emr-users/wenningd/hudi/tables/events/" + tableName
      
      // write hudi dataset
      df.write.format("org.apache.hudi")
        .option(HoodieWriteConfig.TABLE_NAME, tableName)
        .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
        .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
        .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
        .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
        .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
        .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
        .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
        .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
        .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
        .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
        .mode(SaveMode.Overwrite)
        .save(tablePath)
      
      // update a record with event_name "event_name_123" => "event_name_changed"
      val df1 = spark.read.format("org.apache.hudi").load(tablePath + "/*/*")
      val df2 = df1.filter($"event_id" === "104")
      val df3 = df2.withColumn("event_name", lit("event_name_changed"))
      
      // update hudi dataset
      df3.write.format("org.apache.hudi")
         .option(HoodieWriteConfig.TABLE_NAME, tableName)
         .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
         .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
         .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
         .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
         .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
         .option("hoodie.compact.inline", "false")
         .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
         .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
         .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
         .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
         .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
         .mode(SaveMode.Append)
         .save(tablePath)
      

      Now when querying the real-time table from Hive, we have no issue seeing the updated value:

      hive> select event_name from hudi_events_mor_1_rt;
      OK
      event_name_900
      event_name_changed
      event_name_546
      event_name_678
      Time taken: 0.103 seconds, Fetched: 4 row(s)
      

      But when querying the real-time table from Presto, we only read the base parquet file and do not see the update that should be merged in from the log file.

      presto:default> select event_name from hudi_events_mor_1_rt;
         event_name
      ----------------
       event_name_900
       event_name_123
       event_name_546
       event_name_678
      (4 rows)
      

      Our current understanding of this issue is that while the HoodieParquetRealtimeInputFormat correctly generates the splits. The RealtimeCompactedRecordReader record reader is not used so it is not reading the log file and only reading the base parquet file.

       

      Attachments

        Issue Links

          Activity

            People

              bhavanisudha Bhavani Sudha
              bdscheller Brandon Scheller
              Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: