Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-2058

support incremental query for insert_overwrite_table/insert_overwrite operation on cow table

    XMLWordPrintableJSON

Details

    Description

       when  incremental query contains multiple commit before and after replacecommit, and the query result contains the data of the old file. Notice: mor table is ok, only cow table has this problem.
       
      when query incr_view for cow table, replacecommit is ignored which lead the wrong result. 
       
       
      test step:
      step1:  create dataFrame
      val df = spark.range(0, 10).toDF("keyid")
      .withColumn("col3", expr("keyid"))
      .withColumn("age", lit(1))
      .withColumn("p", lit(2))
       
      step2:  insert df to a empty hoodie table
      df.write.format("hudi").
      option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
      option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "col3").
      option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "keyid").
      option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "").
      option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
      option(DataSourceWriteOptions.OPERATION_OPT_KEY, "insert").
      option("hoodie.insert.shuffle.parallelism", "4").
      option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
      .mode(SaveMode.Overwrite).save(basePath)
       
      step3: do insert_overwrite
      df.write.format("hudi").
      option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
      option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "col3").
      option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "keyid").
      option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "").
      option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
      option(DataSourceWriteOptions.OPERATION_OPT_KEY, "insert_overwrite_table").
      option("hoodie.insert.shuffle.parallelism", "4").
      option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
      .mode(SaveMode.Append).save(basePath)
       
      step4: query incrematal table 
      spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "0000")
      .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, currentCommits(0))
      .load(basePath).select("keyid").orderBy("keyid").show(100, false)
       
      result:   the result contains old data
      -----

      keyid

      -----

      0
      0
      1
      1
      2
      2
      3
      3
      4
      4
      5
      5
      6
      6
      7
      7
      8
      8
      9
      9

      -----
       

      Attachments

        Issue Links

          Activity

            People

              xiaotaotao tao meng
              xiaotaotao tao meng
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: