Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-5839

Insert in non-strict mode deduplices dataset in "append" mode - spark

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 0.13.0
    • None
    • spark, writer-core
    • None

    Description

      There seem to be a bug with non-strict insert mode when precombine is not defined (but I have not checked for when it is).
      When using spark datasource it can insert duplicates only in overwrite mode or append mode when data is inserted to the table for the first time, but if I want to insert in append mode for the second time it deduplicates the dataset as if it was working in upsert mode. Found in master (0.13.0).

      I happens to be a regression, because I'm using this functionality in Hudi 0.12.1.

      from pyspark.sql.functions import expr
      
      path = "/tmp/huditbl"
      
      opt_insert = {
          'hoodie.table.name': 'huditbl',
          'hoodie.datasource.write.recordkey.field': 'keyid',
          'hoodie.datasource.write.table.name': 'huditbl',
          'hoodie.datasource.write.operation': 'insert',
          'hoodie.sql.insert.mode': 'non-strict',
          'hoodie.upsert.shuffle.parallelism': 2,
          'hoodie.insert.shuffle.parallelism': 2,
          'hoodie.combine.before.upsert': 'false',
          'hoodie.combine.before.insert': 'false',
          'hoodie.datasource.write.insert.drop.duplicates': 'false'
      }
      
      df = spark.range(0, 10).toDF("keyid") \
        .withColumn("age", expr("keyid + 1000"))
      
      df.write.format("hudi"). \
      options(**opt_insert). \
      mode("overwrite"). \
      save(path)
      
      spark.read.format("hudi").load(path).count() # returns 10
      
      # df = df.union(df) # creates duplicates
      df.write.format("hudi"). \
      options(**opt_insert). \
      mode("append"). \
      save(path)
      
      spark.read.format("hudi").load(path).count() # returns 10 but should return 20 
      
      # note
      # this works:
      df = df.union(df) # creates duplicates 
      df.write.format("hudi"). \ 
      options(**opt_insert). \ 
      mode("overwrite"). \ 
      save(path)
      
      spark.read.format("hudi").load(path).count() # returns 20 as it should

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            kazdy kazdy
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: