Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-4290

Hive connector in Presto returns duplicate records after clustering

    XMLWordPrintableJSON

Details

    Description

      When querying the Hudi table using Hive connector in Presto after a cluster action is complete in the table, the query result contains duplicate records.

      Environment: Presto 0.274-SNAPSHOT (latest), Hudi 0.11

      Steps to reproduce:

      Write Hudi table with clustering

      ./bin/spark-shell  \
           --master yarn \
           --deploy-mode client \
           --driver-memory 8g \
           --executor-memory 8g \
           --num-executors 20 \
           --executor-cores 4 \
           --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.1 \
           --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
           --conf spark.kryoserializer.buffer=256m \
           --conf spark.kryoserializer.buffer.max=1024m \
           --conf "spark.driver.defaultJavaOptions=-XX:+UseG1GC" \
           --conf "spark.executor.defaultJavaOptions=-XX:+UseG1GC" \
           --conf spark.ui.proxyBase="" \
           --conf 'spark.eventLog.enabled=true' --conf 'spark.eventLog.dir=hdfs:///var/log/spark/apps' \
           --conf "spark.sql.hive.convertMetastoreParquet=false" \
           --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
           --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'  

       

      import org.apache.hudi.DataSourceWriteOptions
      import org.apache.hudi.config.HoodieWriteConfig
      import org.apache.hudi.config.HoodieClusteringConfig
      import org.apache.hudi.HoodieDataSourceHelpers
      import org.apache.hudi.config.HoodieWriteConfig._
      import org.apache.spark.sql.SaveMode
      
      val srcPath = "s3a://amazon-reviews-pds/parquet/"
      val tableName = "amazon_reviews_clustered"
      val tablePath = <>
      
      val inputDF = spark.read.format("parquet").load(srcPath)
      
      inputDF.write.
        format("hudi").
        option(HoodieWriteConfig.TABLE_NAME, tableName).
        option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL).
        option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
        option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "review_id").
        option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "product_category").
        option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "review_date").
        option(HoodieClusteringConfig.INLINE_CLUSTERING_PROP, "true").
        option(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP, "0").
        option(HoodieClusteringConfig.CLUSTERING_TARGET_PARTITIONS, "43").
        option(HoodieClusteringConfig.CLUSTERING_MAX_NUM_GROUPS, "100").
        option(HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY, "star_rating,total_votes").
        option("hoodie.metadata.index.column.stats.enable", "true").
        option(BULK_INSERT_SORT_MODE.key(), "NONE").
        mode(SaveMode.Overwrite).
        save(tablePath) 

      Query the table using Hive connector in Presto:

      /presto-cli --catalog hudi --server localhost:9090
      
      select count(review_id) from <table_name> where star_rating > 4 and total_votes > 10;

      The result is different from a Hudi table without clustering like below:

      import org.apache.hudi.DataSourceWriteOptions
      import org.apache.hudi.config.HoodieWriteConfig
      import org.apache.hudi.HoodieDataSourceHelpers
      import org.apache.spark.sql.SaveMode
      import org.apache.hudi.config.HoodieWriteConfig._
      
      val srcPath = "s3a://amazon-reviews-pds/parquet/"
      val tableName = "amazon_reviews_no_clustering"
      val tablePath = <>
      val inputDF = spark.read.format("parquet").load(srcPath)inputDF.write.format("hudi").
        option(HoodieWriteConfig.TABLE_NAME, tableName).
        option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL).
        option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
        option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "review_id").
        option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "product_category").
        option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "review_date").
        option("hoodie.metadata.index.column.stats.enable", "true").
        option(BULK_INSERT_SORT_MODE.key(), "NONE").
        mode(SaveMode.Overwrite).
        save(tablePath) 

       

       

       

      Attachments

        Issue Links

          Activity

            People

              codope Sagar Sumit
              guoyihua Ethan Guo
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: