Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-1461

Bulk insert v2 creates additional small files

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • None
    • None
    • performance
    • None

    Description

      I took a look at the data preparation step for bulk insert, I found that current logic will create additional small files when performing bulk insert v2 which will hurt the performance.

      Current logic is to first sort the input dataframe and then do coalesce: https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java#L104-L106

      For example, we set BulkInsertShuffleParallelism to 2 and have the following df as input:

      val df = Seq(
        (100, "event_name_16", "2015-01-01T13:51:39.340396Z", "type1"),
        (101, "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
        (104, "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
        (108, "event_name_18", "2015-01-01T11:51:33.340396Z", "type1"),
        (109, "event_name_19", "2014-01-01T11:51:33.340396Z", "type3"),
        (110, "event_name_20", "2014-02-01T11:51:33.340396Z", "type3"),
        (105, "event_name_678", "2015-01-01T13:51:42.248818Z", "type2")
        ).toDF("event_id", "event_name", "event_ts", "event_type")
      

      (Here I added a new column partitionID for better understanding) Based on the current logic, after sorting and coalesce, the dataframe would become:

      val df2 = df.sort(functions.col("event_type"), functions.col("event_id")).coalesce(2)
      df2.withColumn("partitionID", spark_partition_id).show(false)
      
      +--------+--------------+---------------------------+----------+-----------+
      |event_id|event_name    |event_ts                   |event_type|partitionID|
      +--------+--------------+---------------------------+----------+-----------+
      |100     |event_name_16 |2015-01-01T13:51:39.340396Z|type1     |0          |
      |108     |event_name_18 |2015-01-01T11:51:33.340396Z|type1     |0          |
      |105     |event_name_678|2015-01-01T13:51:42.248818Z|type2     |0          |
      |110     |event_name_20 |2014-02-01T11:51:33.340396Z|type3     |0          |
      |104     |event_name_123|2015-01-01T12:15:00.512679Z|type1     |1          |
      |101     |event_name_546|2015-01-01T12:14:58.597216Z|type2     |1          |
      |109     |event_name_19 |2014-01-01T11:51:33.340396Z|type3     |1          |
      +--------+--------------+---------------------------+----------+-----------+
      

      You can see the coalesce result actually does not depend on the sorting result. Each spark partition id contains 3 types of Hudi partitions.

      So during the writing phase, each spark executor would get its corresponding partition id, and each executor would create 3 files under 3 Hudi partitions. Finally we have two parquet files under each Hudi partition. But with such a small dataset, ideally we should have single file under each Hudi partition.

      If I change the sort to repartition:

      val df3 = df.repartition(functions.col("event_type")).coalesce(2)
      df3.withColumn("partitionID", spark_partition_id).show(false)
      
      +--------+--------------+---------------------------+----------+-----------+
      |event_id|event_name    |event_ts                   |event_type|partitionID|
      +--------+--------------+---------------------------+----------+-----------+
      |100     |event_name_16 |2015-01-01T13:51:39.340396Z|type1     |0          |
      |104     |event_name_123|2015-01-01T12:15:00.512679Z|type1     |0          |
      |108     |event_name_18 |2015-01-01T11:51:33.340396Z|type1     |0          |
      |101     |event_name_546|2015-01-01T12:14:58.597216Z|type2     |1          |
      |105     |event_name_678|2015-01-01T13:51:42.248818Z|type2     |1          |
      |109     |event_name_19 |2014-01-01T11:51:33.340396Z|type3     |1          |
      |110     |event_name_20 |2014-02-01T11:51:33.340396Z|type3     |1          |
      +--------+--------------+---------------------------+----------+-----------+
      

      In this case, we can have single file under each Hudi partition.

       

      But according to our understanding, we still need the sort part so that we can get benefit from min/max record key index. So the problem is how should we correctly handle the logic.

      Repartition and sort within each partition might be a way? Though sort within each partition might cause OOM issue if the data is unbalance.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              wenningd Wenning Ding
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: