Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-2059

When log exists in mor table, clustering is triggered. The query result shows that the update record in log is lost

    XMLWordPrintableJSON

Details

    Description

      When log exists in mor table, and clustering is triggered. The query result shows that the update record of log is lost。

      the reason of this problem is that:  hoodie use HoodieFileSliceReader to read table data and then do clustering.  HoodieFileSliceReader call HoodieMergedLogRecordScanner.

      processNextRecord to merge update values and old valuse,   when call that function old values is reserved update values is discarded, this is wrong。

      test step:

      // step1 : create hudi mor table
      val df = spark.range(0, 1000).toDF("keyid")
      .withColumn("col3", expr("keyid"))
      .withColumn("age", lit(1))
      .withColumn("p", lit(2))

      df.write.format("hudi").
      option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
      option(PRECOMBINE_FIELD_OPT_KEY, "col3").
      option(RECORDKEY_FIELD_OPT_KEY, "keyid").
      option(PARTITIONPATH_FIELD_OPT_KEY, "p").
      option(DataSourceWriteOptions.OPERATION_OPT_KEY, "insert").
      option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
      option("hoodie.insert.shuffle.parallelism", "4").
      option("hoodie.upsert.shuffle.parallelism", "4").
      option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
      .mode(SaveMode.Overwrite).save(basePath)

      // step2, update age where keyid < 5 to produce log files
      val df1 = spark.range(0, 5).toDF("keyid")
      .withColumn("col3", expr("keyid"))
      .withColumn("age", lit(1 + 1000))
      .withColumn("p", lit(2))
      df1.write.format("hudi").
      option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
      option(PRECOMBINE_FIELD_OPT_KEY, "col3").
      option(RECORDKEY_FIELD_OPT_KEY, "keyid").
      option(PARTITIONPATH_FIELD_OPT_KEY, "p").
      option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert").
      option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
      option("hoodie.insert.shuffle.parallelism", "4").
      option("hoodie.upsert.shuffle.parallelism", "4").
      option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
      .mode(SaveMode.Append).save(basePath)

      // step3, do cluster inline
      val df2 = spark.range(6, 10).toDF("keyid")
      .withColumn("col3", expr("keyid"))
      .withColumn("age", lit(1 + 2000))
      .withColumn("p", lit(2))

      df2.write.format("hudi").
      option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
      option(PRECOMBINE_FIELD_OPT_KEY, "col3").
      option(RECORDKEY_FIELD_OPT_KEY, "keyid").
      option(PARTITIONPATH_FIELD_OPT_KEY, "p").
      option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert").
      option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
      option("hoodie.insert.shuffle.parallelism", "4").
      option("hoodie.upsert.shuffle.parallelism", "4").
      option("hoodie.parquet.small.file.limit", "0").
      option("hoodie.clustering.inline", "true").
      option("hoodie.clustering.inline.max.commits", "1").
      option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").
      option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").
      option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString)
      .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
      .mode(SaveMode.Append).save(basePath)

      spark.read.format("hudi")
      .load(basePath).select("age").where("keyid = 0").show(100, false)

      ---

      age

      ---

      1

      the result is wrong, since we update the value of age to 1001 at step 2.

       

       

       

       

      Attachments

        Issue Links

          Activity

            People

              xiaotaotao tao meng
              xiaotaotao tao meng
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: