Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-1719

hive on spark/mr,Incremental query of the mor table, the partition field is incorrect

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

      Description

      now hudi use HoodieCombineHiveInputFormat to achieve Incremental query of the mor table.

      when we have some small files in different partitions, HoodieCombineHiveInputFormat  will combine those small file readers.   HoodieCombineHiveInputFormat  build partition field base on  the first file reader in it, however now HoodieCombineHiveInputFormat  holds other file readers which come from different partitions.

      When switching readers, we should  update ioctx

      test env:

      spark2.4.5, hadoop 3.1.1, hive 3.1.1

      test step:

      step1:

      val df = spark.range(0, 10000).toDF("keyid")
      .withColumn("col3", expr("keyid + 10000000"))
      .withColumn("p", lit(0))
      .withColumn("p1", lit(0))
      .withColumn("p2", lit(6))
      .withColumn("a1", lit(Array[String]("sb1", "rz")))
      .withColumn("a2", lit(Array[String]("sb1", "rz")))

      // create hudi table which has three  level partitions p,p1,p2

      merge(df, 4, "default", "hive_8b", DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "bulk_insert")

       

      step2:

      val df = spark.range(0, 10000).toDF("keyid")
      .withColumn("col3", expr("keyid + 10000000"))
      .withColumn("p", lit(0))
      .withColumn("p1", lit(0))
      .withColumn("p2", lit(7))
      .withColumn("a1", lit(Array[String]("sb1", "rz")))
      .withColumn("a2", lit(Array[String]("sb1", "rz")))

      // upsert current table

      merge(df, 4, "default", "hive_8b", DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "upsert")

      hive beeline:

      set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;

      set hoodie.hive_8b.consume.mode=INCREMENTAL;

      set hoodie.hive_8b.consume.max.commits=3;

      set hoodie.hive_8b.consume.start.timestamp=20210325141300; // this timestamp is smaller the earlist commit, so  we can query whole commits

      select `p`, `p1`, `p2`,`keyid` from hive_8b_rt where `_hoodie_commit_time`>'20210325141300' and `keyid` < 5;

      query result:

      ------+------------

      p p1 p2 keyid

      ------+------------

      0 0 6 0
      0 0 6 1
      0 0 6 2
      0 0 6 3
      0 0 6 4
      0 0 6 4
      0 0 6 0
      0 0 6 3
      0 0 6 2
      0 0 6 1

      ------+------------

      this result is wrong, since the second step we insert new data in table which p2=7, however in the query result we cannot find p2=7, all p2= 6

       

       

        Attachments

          Activity

            People

            • Assignee:
              xiaotaotao tao meng
              Reporter:
              xiaotaotao tao meng

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment