Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
None
Description
After updating Hudi table with the written bootstrap table, it would fail to read the latest bootstrap table.
Reproduction steps
import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.config.HoodieBootstrapConfig import org.apache.hudi.config.HoodieWriteConfig import org.apache.spark.sql.SaveMode import org.apache.spark.sql.SparkSession val bucketName = "wenningd-dev" val tableName = "hudi_bootstrap_test_cow_5c1a5147_888e_4b638bef8" val recordKeyName = "event_id" val partitionKeyName = "event_type" val precombineKeyName = "event_time" val verificationRecordKey = "4" val verificationColumn = "event_name" val originalVerificationValue = "event_d" val updatedVerificationValue = "event_test" val sourceTableLocation = "s3://wenningd-dev/hudi/test-data/source_table/" val tableType = HoodieTableType.COPY_ON_WRITE.name() val verificationSqlQuery = "select " + verificationColumn + " from " + tableName + " where " + recordKeyName + " = '" + verificationRecordKey + "'" val tablePath = "s3://" + bucketName + "/hudi/tables/" + tableName val loadTablePath = tablePath + "/*/*" // Create table and sync with hive val df = spark.emptyDataFrame val tableType = HoodieTableType.COPY_ON_WRITE.name df.write .format("hudi") .option(HoodieWriteConfig.TABLE_NAME, tableName) .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, sourceTableLocation) .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, recordKeyName) .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName) .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, partitionKeyName) .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor") .mode(SaveMode.Overwrite) .save(tablePath) // Verify create with spark sql query val result0 = spark.sql(verificationSqlQuery) if (!(result0.count == 1) || !result0.collect.mkString.contains(originalVerificationValue)) { throw new TestFailureException("Create table verification failed!") } val df3 = spark.read.format("org.apache.hudi").load(loadTablePath) val df4 = df3.filter(col(recordKeyName) === verificationRecordKey) val df5 = df4.withColumn(verificationColumn, lit(updatedVerificationValue)) df5.write.format("org.apache.hudi") .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, tableType) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, recordKeyName) .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, partitionKeyName) .option(HoodieWriteConfig.TABLE_NAME, tableName) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, precombineKeyName) .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName) .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, partitionKeyName) .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor") .mode(SaveMode.Append) .save(tablePath) val result1 = spark.sql(verificationSqlQuery) val df6 = spark.read.format("org.apache.hudi").load(loadTablePath) df6.show
df6.show would return:
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2043)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2031)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2030)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2030)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:967)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2264)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2213)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2202)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:778)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:407)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3395)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2552)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2552)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2552)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2766)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
at org.apache.spark.sql.Dataset.show(Dataset.scala:753)
at org.apache.spark.sql.Dataset.show(Dataset.scala:712)
at org.apache.spark.sql.Dataset.show(Dataset.scala:721)
... 49 elided
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.arrayData(WritableColumnVector.java:637)
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:378)
at org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getUTF8String(MutableColumnarRow.java:135)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:297)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:289)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
The root cause is:
the requiredColumns in the buildScan() does not follow the same order as the schema file.
For example, when I selected all the columns, I printed the requiredColumns:
20/10/13 22:57:59 WARN HoodieBootstrapRelation: wenningd = > required columns: _hoodie_commit_time _hoodie_record_key _hoodie_partition_path event_type event_id event_guests event_time _hoodie_commit_seqno _hoodie_file_name event_name
You can see not all the metadata columns are in the front. So the problem here is when we try to use regularReadFunction, we use this as the schema: requiredSkeletonSchema.fields ++ requiredDataSchema.fields. But since the required columns do not follow the same order as schema file, there's a schema mismatch between requiredSchema and requiredColumns
Attachments
Issue Links
- links to