Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-1489

Not able to read after updating bootstrap table with written table

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • 0.7.0
    • None

    Description

      After updating Hudi table with the written bootstrap table, it would fail to read the latest bootstrap table.

      Reproduction steps

      import org.apache.hudi.DataSourceWriteOptions
      import org.apache.hudi.common.model.HoodieTableType
      import org.apache.hudi.config.HoodieBootstrapConfig
      import org.apache.hudi.config.HoodieWriteConfig
      import org.apache.spark.sql.SaveMode
      import org.apache.spark.sql.SparkSession
      
          val bucketName = "wenningd-dev"
          val tableName = "hudi_bootstrap_test_cow_5c1a5147_888e_4b638bef8"
          val recordKeyName = "event_id"
          val partitionKeyName = "event_type"
          val precombineKeyName = "event_time"
          val verificationRecordKey = "4"
          val verificationColumn = "event_name"
          val originalVerificationValue = "event_d"
          val updatedVerificationValue = "event_test"
          val sourceTableLocation = "s3://wenningd-dev/hudi/test-data/source_table/"
      
          val tableType = HoodieTableType.COPY_ON_WRITE.name()
          val verificationSqlQuery = "select " + verificationColumn + " from " + tableName + " where " + recordKeyName + " = '" + verificationRecordKey + "'"
          val tablePath = "s3://" + bucketName + "/hudi/tables/" + tableName
          val loadTablePath = tablePath + "/*/*"
      
          // Create table and sync with hive
      
              val df = spark.emptyDataFrame
              val tableType = HoodieTableType.COPY_ON_WRITE.name
      
              df.write
                .format("hudi")
                .option(HoodieWriteConfig.TABLE_NAME, tableName)
                .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, sourceTableLocation)
                .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator")
                .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
                .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType)
                .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, recordKeyName)
                .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
                .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
                .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, partitionKeyName)
                .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
                .mode(SaveMode.Overwrite)
                .save(tablePath)
      
          // Verify create with spark sql query
          val result0 = spark.sql(verificationSqlQuery)
          if (!(result0.count == 1) || !result0.collect.mkString.contains(originalVerificationValue)) {
            throw new TestFailureException("Create table verification failed!")
          }
      
      
          val df3 = spark.read.format("org.apache.hudi").load(loadTablePath)
      val df4 = df3.filter(col(recordKeyName) === verificationRecordKey)
      val df5 = df4.withColumn(verificationColumn, lit(updatedVerificationValue))
      df5.write.format("org.apache.hudi")
        .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, tableType)
        .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, recordKeyName)
        .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, partitionKeyName)
        .option(HoodieWriteConfig.TABLE_NAME, tableName)
        .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, precombineKeyName)
        .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
        .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
        .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, partitionKeyName)
        .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
        .mode(SaveMode.Append)
        .save(tablePath)
      
        val result1 = spark.sql(verificationSqlQuery)
      
        val df6 = spark.read.format("org.apache.hudi").load(loadTablePath)
      
      df6.show
      

      df6.show would return:

      Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2043)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2031)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2030)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2030)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:967)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2264)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2213)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2202)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:778)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:407)
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3395)
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2552)
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2552)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
        at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94)
        at org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
        at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2552)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2766)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:753)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:712)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:721)
        ... 49 elided
      Caused by: java.lang.NullPointerException
        at org.apache.spark.sql.execution.vectorized.WritableColumnVector.arrayData(WritableColumnVector.java:637)
        at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:378)
        at org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getUTF8String(MutableColumnarRow.java:135)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:297)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:289)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
      

      The root cause is:

       the requiredColumns in the buildScan() does not follow the same order as the schema file.

      For example, when I selected all the columns, I printed the requiredColumns:

       

      20/10/13 22:57:59 WARN HoodieBootstrapRelation: wenningd = > required columns: _hoodie_commit_time _hoodie_record_key _hoodie_partition_path event_type event_id event_guests event_time _hoodie_commit_seqno _hoodie_file_name event_name

      You can see not all the metadata columns are in the front. So the problem here is when we try to use regularReadFunction, we use this as the schema: requiredSkeletonSchema.fields ++ requiredDataSchema.fields. But since the required columns do not follow the same order as schema file, there's a schema mismatch between requiredSchema and requiredColumns

      Attachments

        Issue Links

          Activity

            People

              wenningd Wenning Ding
              wenningd Wenning Ding
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: