Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-8719 File group reader enhancement - Phase 0
  3. HUDI-7610

Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

    XMLWordPrintableJSON

Details

    • 4

    Description

      Here is a test that can be run on master:

       

      @Test
      def showDeleteIsInconsistent(): Unit = {
        val merger = classOf[HoodieSparkRecordMerger].getName
        //val merger = classOf[HoodieAvroRecordMerger].getName
        val useFGReader = "true"
        //val useFGReader = "false"
        //val tableType = "COPY_ON_WRITE"
        val tableType = "MERGE_ON_READ"
      
      
        val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
        val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7),
          (10, "2", "rider-B", "driver-B", 27.70, 1),
          (10, "3", "rider-C", "driver-C", 33.90, 10),
          (-1, "4", "rider-D", "driver-D", 34.15, 6),
          (10, "5", "rider-E", "driver-E", 17.85, 10))
      
        val inserts = spark.createDataFrame(data).toDF(columns: _*)
        inserts.write.format("hudi").
          option(RECORDKEY_FIELD.key(), "key").
          option(PRECOMBINE_FIELD.key(), "ts").
          option(TABLE_TYPE.key(), tableType).
          option("hoodie.table.name", "test_table").
          option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
          option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
          option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
          mode(SaveMode.Overwrite).
          save(basePath)
      
        val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9),
          (9, "2", "rider-Y", "driver-Y", 27.70, 7))
      
        val updates = spark.createDataFrame(updateData).toDF(columns: _*)
      
        updates.write.format("hudi").
          option(RECORDKEY_FIELD.key(), "key").
          option(PRECOMBINE_FIELD.key(), "ts").
          option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
          option(TABLE_TYPE.key(), tableType).
          option(OPERATION.key(), "upsert").
          option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
          option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
          option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
          mode(SaveMode.Append).
          save(basePath)
      
        val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))
      
        val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
        deletes.write.format("hudi").
          option(RECORDKEY_FIELD.key(), "key").
          option(PRECOMBINE_FIELD.key(), "ts").
          option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
          option(TABLE_TYPE.key(), tableType).
          option(OPERATION.key(), "delete").
          option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
          option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
          option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
          mode(SaveMode.Append).
          save(basePath)
      
      
        val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), (-10, "4", "rider-DD", "driver-DD", 34.15, 5))
        val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: _*)
        secondUpdates.write.format("hudi").
          option(RECORDKEY_FIELD.key(), "key").
          option(PRECOMBINE_FIELD.key(), "ts").
          option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
          option(TABLE_TYPE.key(), tableType).
          option(OPERATION.key(), "upsert").
          option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
          option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
          option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
          mode(SaveMode.Append).
          save(basePath)
      
        val df = spark.read.format("hudi").
          option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
          option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), "false").load(basePath)
        val finalDf = df.select("ts", "key", "rider", "driver", "fare", "number")
        finalDf.show(100,false)
      }

       

      There are 4 different outcomes:

       

       

       

      merger: Avro, useFGReader: false, tableType: cow
      merger: Avro, useFGReader: true, tableType: cow
      merger: Spark, useFGReader: false, tableType: cow
      merger: Spark, useFGReader: true, tableType: cow

      +---+---+--------+---------+-----+------+
      |ts |key|rider   |driver   |fare |number|
      +---+---+--------+---------+-----+------+
      |11 |1  |rider-X |driver-X |19.1 |9     |
      |14 |5  |rider-Z |driver-Z |17.85|3     |
      |10 |3  |rider-C |driver-C |33.9 |10    |
      |10 |2  |rider-B |driver-B |27.7 |1     |
      |-10|4  |rider-DD|driver-DD|34.15|5     |
      +---+---+--------+---------+-----+------+

       

       

       
      merger: Avro, useFGReader: false, tableType: mor

      +---+---+-------+--------+-----+------+
      |ts |key|rider  |driver  |fare |number|
      +---+---+-------+--------+-----+------+
      |11 |1  |rider-X|driver-X|19.1 |9     |
      |14 |5  |rider-Z|driver-Z|17.85|3     |
      |-1 |4  |rider-D|driver-D|34.15|6     |
      |10 |3  |rider-C|driver-C|33.9 |10    |
      |10 |2  |rider-B|driver-B|27.7 |1     |
      +---+---+-------+--------+-----+------+ 

       

       

       

      merger: Avro, useFGReader: true, tableType: mor

      +---+---+-------+--------+-----+------+
      |ts |key|rider  |driver  |fare |number|
      +---+---+-------+--------+-----+------+
      |11 |1  |rider-X|driver-X|19.1 |9     |
      |14 |5  |rider-Z|driver-Z|17.85|3     |
      |10 |3  |rider-C|driver-C|33.9 |10    |
      |10 |2  |rider-B|driver-B|27.7 |1     |
      +---+---+-------+--------+-----+------+ 

       

       
      merger: Spark, useFGReader: false, tableType: mor
      merger: Spark, useFGReader: true, tableType: mor

      java.lang.NullPointerException

      --------------------------------------------------------------------------------------------
      There is actually even more strangeness with this combo:

      merger: Avro, useFGReader: false, tableType: mor 

      If I change the precombine for the insert of record 4 to -6, we get:

      +---+---+-------+--------+-----+------+
      |ts |key|rider  |driver  |fare |number|
      +---+---+-------+--------+-----+------+
      |11 |1  |rider-X|driver-X|19.1 |9     |
      |14 |5  |rider-Z|driver-Z|17.85|3     |
      |-6 |4  |rider-D|driver-D|34.15|6     |
      |10 |3  |rider-C|driver-C|33.9 |10    |
      |10 |2  |rider-B|driver-B|27.7 |1     |
      +---+---+-------+--------+-----+------+ 

      However, if I then get rid of the final insert, then the output is 

      +---+---+-------+--------+-----+------+
      |ts |key|rider  |driver  |fare |number|
      +---+---+-------+--------+-----+------+
      |11 |1  |rider-X|driver-X|19.1 |9     |
      |10 |5  |rider-E|driver-E|17.85|10    |
      |10 |3  |rider-C|driver-C|33.9 |10    |
      |10 |2  |rider-B|driver-B|27.7 |1     |
      +---+---+-------+--------+-----+------+ 

      Attachments

        Issue Links

          Activity

            People

              linliu Lin Liu
              jonvex Jonathan Vexler
              Jonathan Vexler, sivabalan narayanan, Y Ethan Guo
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: