Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8350

Splitting batches should consider topic-level message size

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.3.0
    • None
    • producer
    • None

    Description

      Currently, producers do the batch splitting based on the batch size. However, the split will never succeed when batch size is greatly larger than the topic-level max message size.

      For instance, if the batch size is set to 8MB but we maintain the default value for broker-side `message.max.bytes` (1000012, about1MB), producer will endlessly try to split a large batch but never succeeded, as shown below:

      [2019-05-10 16:25:09,233] WARN [Producer clientId=producer-1] Got error produce response in correlation id 61 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617)
      [2019-05-10 16:25:10,021] WARN [Producer clientId=producer-1] Got error produce response in correlation id 62 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617)
      [2019-05-10 16:25:10,758] WARN [Producer clientId=producer-1] Got error produce response in correlation id 63 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617)
      [2019-05-10 16:25:12,071] WARN [Producer clientId=producer-1] Got error produce response in correlation id 64 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617)

      A better solution is to have producer do splitting based on the minimum of these two configs. However, it is tricky for the client to get the topic-level or broker-level config values. Seems  there could be three ways to do this:

      1. When broker throws `RecordTooLargeException`, do not swallow its real message since it contains the max message size already. If the message is not swallowed, the client easily gets it from the response.
      2. Add code to issue  `DescribeConfigsRequest` to retrieve the value.
      3. If splitting failed, decreases the batch size gradually until the split is successful. For  example, 
      // In RecordAccumulator.java
      private int steps = 1;
      ......
      public int splitAndReenqueue(ProducerBatch bigBatch) {
      ......
          Deque<ProducerBatch> dq = bigBatch.split(this.batchSize / steps);
          if (dq.size() == 1) // split failed
              steps++;
      ......
      }

      Do all of these make sense?

       

       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              huxi_2b huxihx
              Votes:
              2 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated: