Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
None
-
None
Description
After ingesting Hudi table w/ the following configuration, i'm still getting "hoodie.table.partition.fields=partitionpath" in the "hoodie.properties", which blocks this table form being read.
scala> val readDf: DataFrame = | spark.read.option("hoodie.enable.data.skipping", "false").format("hudi").load(outputPath) java.lang.IllegalArgumentException: Cannot find column: 'partitionpath' in the schema[StructField(_hoodie_commit_time,StringType,true),StructField(_hoodie_commit_seqno,StringType,true),StructField(_hoodie_record_key,StringType,true),StructField(_hoodie_partition_path,StringType,true),StructField(_hoodie_file_name,StringType,true),StructField(marketplace,StringType,true),StructField(customer_id,StringType,true),StructField(review_id,StringType,true),StructField(product_id,StringType,true),StructField(product_parent,StringType,true),StructField(product_title,StringType,true),StructField(star_rating,IntegerType,true),StructField(helpful_votes,IntegerType,true),StructField(total_votes,IntegerType,true),StructField(vine,StringType,true),StructField(verified_purchase,StringType,true),StructField(review_headline,StringType,true),StructField(review_body,StringType,true),StructField(review_date,DateType,true),StructField(year,IntegerType,true),StructField(product_category,StringType,true)] at org.apache.hudi.HoodieFileIndex.$anonfun$_partitionSchemaFromProperties$3(HoodieFileIndex.scala:118) at scala.collection.MapLike.getOrElse(MapLike.scala:131) at scala.collection.MapLike.getOrElse$(MapLike.scala:129) at scala.collection.AbstractMap.getOrElse(Map.scala:63) at org.apache.hudi.HoodieFileIndex.$anonfun$_partitionSchemaFromProperties$2(HoodieFileIndex.scala:117) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at org.apache.hudi.HoodieFileIndex._partitionSchemaFromProperties$lzycompute(HoodieFileIndex.scala:116) at org.apache.hudi.HoodieFileIndex._partitionSchemaFromProperties(HoodieFileIndex.scala:110) at org.apache.hudi.HoodieFileIndex.getAllQueryPartitionPaths(HoodieFileIndex.scala:503) at org.apache.hudi.HoodieFileIndex.loadPartitionPathFiles(HoodieFileIndex.scala:575) at org.apache.hudi.HoodieFileIndex.refresh0(HoodieFileIndex.scala:360) at org.apache.hudi.HoodieFileIndex.<init>(HoodieFileIndex.scala:157) at org.apache.hudi.DefaultSource.getBaseFileOnlyView(DefaultSource.scala:199) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:119) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:69) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325) at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239) ... 56 elidedscala>
Example table config:
val commonOpts = Map( "hoodie.compact.inline" -> "false", "hoodie.bulk_insert.shuffle.parallelism" -> "10" ) spark.sparkContext.setLogLevel("DEBUG") //////////////////////////////////////////////////////////////// // Writing to Hudi //////////////////////////////////////////////////////////////// val fs = FSUtils.getFs(outputPath, spark.sparkContext.hadoopConfiguration) if (!fs.exists(new Path(outputPath))) { val df = spark.read.parquet(inputPath) df.write.format("hudi") .option(DataSourceWriteOptions.TABLE_TYPE.key(), COW_TABLE_TYPE_OPT_VAL) .option("hoodie.table.name", tableName) .option(PRECOMBINE_FIELD.key(), "review_id") .option(RECORDKEY_FIELD.key(), "review_id") //.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "product_category") .option("hoodie.clustering.inline", "true") .option("hoodie.clustering.inline.max.commits", "1") // NOTE: Small file limit is intentionally kept _ABOVE_ target file-size max threshold for Clustering, // to force re-clustering .option("hoodie.clustering.plan.strategy.small.file.limit", String.valueOf(1024 * 1024 * 1024)) // 1Gb .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(128 * 1024 * 1024)) // 128Mb .option("hoodie.clustering.plan.strategy.max.num.groups", String.valueOf(4096)) .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true") .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key, layoutOptStrategy) .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "product_id,customer_id") .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) .option(BULK_INSERT_SORT_MODE.key(), "NONE") .options(commonOpts) .mode(ErrorIfExists) .save(outputPath) }
Attachments
Issue Links
- links to