Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-7506

Compute offsetRanges based on eventsPerPartition allocated in each range

    XMLWordPrintableJSON

Details

    • 4

    Description

      The current logic for computing offset ranges is leading to skews and negative offsets because of the way they are calculated. 
      https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java#L144 

      Problems faced. 
      1. We are calculating eventsPerPartition based on available partitions that are not exhausted this can lead to skews where one partition handles only 1-10 messages and the remaining one handles 100K messages, the idea for minPartitions is to increase the parallelism and ensure that each spark task is reading approximately the same number of events. 
      2. remainingPartitions can become negative when finalRanges exceeds the size of minPartitions. 
      3. Complicated fork in code when minPartitions > toOffsetsMap, this is not required IMO and the default minPartitions can always fall back toOffsetsMap.size(), this takes care of situations when the partitions increase in kafka as well. 

       

      New Approach
      1. Find eventsPerPartition which would be Math.max(1L, actualNumEvents / minPartitions);
      2. Keep computing offsetRanges unless allocatedEvents < actualNumEvents, compute them in a round-robin manner and keep the upper limit of eventsPerPartition messages for each range.
      3.  Return all the offsetRanges in the end after sorting them by partition 

      Attachments

        Issue Links

          Activity

            People

              vinish_jail97 Vinish Reddy
              vinish_jail97 Vinish Reddy
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: