Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-1658 [UMBRELLA] Spark Sql Support For Hudi
  3. HUDI-2467

Delete data is not working with 0.9.0 and pySpark

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • Spark Integration
    • None

    Description

      Following this spark guide:
      https://hudi.apache.org/docs/quick-start-guide/

      Everything works until delete data:

      I'm using Pyspark with Spark 3.1.2 with python 3.9

      // code placeholder
      # pyspark# fetch total records count
      
      spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
      
      # fetch two records to be deleted
      
      ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
      # issue deletes
      hudi_delete_options = {  'hoodie.table.name': tableName,  'hoodie.datasource.write.recordkey.field': 'uuid',  'hoodie.datasource.write.partitionpath.field': 'partitionpath',  'hoodie.datasource.write.table.name': tableName,  'hoodie.datasource.write.operation': 'delete',  'hoodie.datasource.write.precombine.field': 'ts',  'hoodie.upsert.shuffle.parallelism': 2,   
      'hoodie.insert.shuffle.parallelism': 2}
      
      from pyspark.sql.functions import lit
      deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
      df = spark.sparkContext.parallelize(deletes).toDF(['uuid', 'partitionpath']).withColumn('ts', lit(0.0))
      
      df.write.format("hudi"). \  
        options(**hudi_delete_options). \
        mode("append"). \
        save(basePath)
      
      # run the same read query as above.
      roAfterDeleteViewDF = spark. \
        read. \
        format("hudi"). \
        load(basePath) 
      
      roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
      # fetch should return (total - 2) records
      
      spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

      The count before delete is 10 and after delete is still 10 (expecting 8)

      // code placeholder
      >>> df.show()
      +--------------------+--------------------+---+
      |       partitionpath|                uuid| ts|
      +--------------------+--------------------+---+
      |74bed794-c854-4aa...|americas/united_s...|0.0|
      |ce71c2dc-dedf-483...|americas/united_s...|0.0|
      +--------------------+--------------------+---+
      

       

      The 2 records to be deleted

      Note, the 

      Attachments

        Activity

          People

            Unassigned Unassigned
            phi@gpm.com Phil Chen
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: