Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9703

ProducerBatch.split takes up too many resources if the bigBatch is huge

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 2.6.0
    • Component/s: None
    • Labels:
      None
    • Flags:
      Patch

      Description

      ProducerBatch.split takes up too many resources and might cause outOfMemory error if the bigBatch is huge. About how I found this issue is in https://lists.apache.org/list.html?users@kafka.apache.org:lte=1M:MESSAGE_TOO_LARGE

      Following is the code which takes a lot of resources.

       for (Record record : recordBatch) {
                  assert thunkIter.hasNext();
                  Thunk thunk = thunkIter.next();
                  if (batch == null)
                      batch = createBatchOffAccumulatorForRecord(record, splitBatchSize);
      
                  // A newly created batch can always host the first message.
                  if (!batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk)) {
                      batches.add(batch);
                      batch = createBatchOffAccumulatorForRecord(record, splitBatchSize);
                      batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk);
                  }
      

      Refer to RecordAccumulator#tryAppend, we can call closeForRecordAppends() after a batch is full.

          private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                               Callback callback, Deque<ProducerBatch> deque, long nowMs) {
              ProducerBatch last = deque.peekLast();
              if (last != null) {
                  FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
                  if (future == null)
                      last.closeForRecordAppends();
                  else
                      return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
              }
              return null;
          }
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                adally jiamei xie
                Reporter:
                adally jiamei xie
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: