Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
0.11.0
-
None
Description
When querying the Hudi table using Hive connector in Presto after a cluster action is complete in the table, the query result contains duplicate records.
Environment: Presto 0.274-SNAPSHOT (latest), Hudi 0.11
Steps to reproduce:
Write Hudi table with clustering
./bin/spark-shell \ --master yarn \ --deploy-mode client \ --driver-memory 8g \ --executor-memory 8g \ --num-executors 20 \ --executor-cores 4 \ --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.1 \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --conf spark.kryoserializer.buffer=256m \ --conf spark.kryoserializer.buffer.max=1024m \ --conf "spark.driver.defaultJavaOptions=-XX:+UseG1GC" \ --conf "spark.executor.defaultJavaOptions=-XX:+UseG1GC" \ --conf spark.ui.proxyBase="" \ --conf 'spark.eventLog.enabled=true' --conf 'spark.eventLog.dir=hdfs:///var/log/spark/apps' \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieClusteringConfig import org.apache.hudi.HoodieDataSourceHelpers import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.spark.sql.SaveMode val srcPath = "s3a://amazon-reviews-pds/parquet/" val tableName = "amazon_reviews_clustered" val tablePath = <> val inputDF = spark.read.format("parquet").load(srcPath) inputDF.write. format("hudi"). option(HoodieWriteConfig.TABLE_NAME, tableName). option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL). option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL). option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "review_id"). option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "product_category"). option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "review_date"). option(HoodieClusteringConfig.INLINE_CLUSTERING_PROP, "true"). option(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP, "0"). option(HoodieClusteringConfig.CLUSTERING_TARGET_PARTITIONS, "43"). option(HoodieClusteringConfig.CLUSTERING_MAX_NUM_GROUPS, "100"). option(HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY, "star_rating,total_votes"). option("hoodie.metadata.index.column.stats.enable", "true"). option(BULK_INSERT_SORT_MODE.key(), "NONE"). mode(SaveMode.Overwrite). save(tablePath)
Query the table using Hive connector in Presto:
/presto-cli --catalog hudi --server localhost:9090 select count(review_id) from <table_name> where star_rating > 4 and total_votes > 10;
The result is different from a Hudi table without clustering like below:
import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.HoodieDataSourceHelpers import org.apache.spark.sql.SaveMode import org.apache.hudi.config.HoodieWriteConfig._ val srcPath = "s3a://amazon-reviews-pds/parquet/" val tableName = "amazon_reviews_no_clustering" val tablePath = <> val inputDF = spark.read.format("parquet").load(srcPath)inputDF.write.format("hudi"). option(HoodieWriteConfig.TABLE_NAME, tableName). option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL). option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL). option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "review_id"). option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "product_category"). option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "review_date"). option("hoodie.metadata.index.column.stats.enable", "true"). option(BULK_INSERT_SORT_MODE.key(), "NONE"). mode(SaveMode.Overwrite). save(tablePath)
Attachments
Issue Links
- relates to
-
HUDI-4307 Document version where replaced filegroups arennot being filtered out
- Closed
- links to