Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-4384

Hive style partition not work and record key loss prefix using ComplexKey in bulk_insert

    XMLWordPrintableJSON

Details

    Description

      When using bulk_insert in 0.11.1, "hoodie.datasource.write.hive_style_partitioning" won't work

      When using "org.apache.hudi.keygen.ComplexKeyGenerator", there is no prefix in column "_hoodie_record_key"

      There is a Gitlab issue reported: https://github.com/apache/hudi/issues/6070

      And we can reproduce this bug with code

        def main(args: Array[String]): Unit = {
          val avroSchema = new Schema.Parser().parse(new File("~/hudi/docker/demo/config/schema.avsc"))
          val schema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
          val df = spark.read.schema(schema).json("file://~/hudi/docker/demo/data/batch_1.json")
      
          val options = Map (
            "hoodie.datasource.write.keygenerator.class" -> "org.apache.hudi.keygen.ComplexKeyGenerator",
            "hoodie.bulkinsert.sort.mode" -> "GLOBAL_SORT",
            "hoodie.datasource.write.table.type" -> "COPY_ON_WRITE",
            "hoodie.datasource.write.precombine.field" -> "ts",
            "hoodie.datasource.write.recordkey.field" -> "key",
            "hoodie.datasource.write.partitionpath.field" -> "year",
            "hoodie.datasource.write.hive_style_partitioning" -> "true",
            "hoodie.datasource.hive_sync.enable" -> "false",
            "hoodie.datasource.hive_sync.partition_fields" -> "year",
            "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor"
          )
      
          bulkInsert(df, options)
          insert(df, options)
        }
      
        def bulkInsert(df: DataFrame, options: Map[String, String]): Unit = {
          val allOptions: Map[String, String] = options ++ Map (
            "hoodie.datasource.write.operation" -> "bulk_insert",
            "hoodie.table.name" -> "test_hudi_bulk_table"
          )
      
          df.write.format("hudi")
            .options(allOptions)
            .mode(SaveMode.Overwrite)
            .save("file://~/test_hudi_bulk_table")
        }
      
        def insert(df: DataFrame, options: Map[String, String]): Unit = {
          val allOptions: Map[String, String] = options ++ Map (
            "hoodie.datasource.write.operation" -> "insert",
            "hoodie.table.name" -> "test_hudi_insert_table"
          )
      
          df.write.format("hudi")
            .options(allOptions)
            .mode(SaveMode.Overwrite)
            .save("file://~/test_hudi_insert_table")
        }
      

      The data written in file showing as below

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              teng_huo Teng Huo
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: