Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10335

Blocking of producer IO thread when calling send() from callback

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • None
    • None
    • clients, producer
    • None

    Description

      We had application which supposed to be using KafkaProducer to deliver results of some work. Sometimes delivery of results weren't successful because of network connectivity errors or maintenance happening on the broker side. In such cases we wanted application to send an event with error and original message details. All good, but we wanted errors to be delivered to a separate topic. So we implemented a callback in send() method, using the same producer instance and calling send() from there.

      This application worked for some time, but then we encountered that its producer was stuck. Almost no CPU consumption and expiring batches for hours. After connecting with debugger it turned out that sender IO thread is blocking. When record is expired, a callback was called, which contained a call to send(), implying usage of a new topic, which metadata is not present in producer's client cache. When send() is missing metadata, it is allowed to block for up to max.block.ms interval, which is 60 secs by default. If application is active, then it will quickly result in a large amount of accumulated records. Every record will block IO thread for 60s. Therefore sender IO thread is essentially blocked. In Producer only Sender IO thread contains a call to client's poll() method, which is responsible for all the network communication and metadata update. If poll() is executed with significant delay then it will result to errors, connected with various timeouts. That's it we've got a stuck producer with little chance to recover.

      To summarise, pre-requisites for the problem are sending from callback, using the same producer instance and usage of topic which wasn't seen before.

      I think it is important to decide if the issue is KafkaProducer misuse or its bug. Code is callbacks shouldn't block, that is clear, but at the same time, no one expects already initialised producer to block.

      Depending on decision I could produce a fix, it can be either a warning when user is trying to call a send() from callback, or reduction of max allowed blocking time for metadata update. It could be just docs changes, or even nothing.

      Here is code to reproduce the issue, the output it is producing follows the code snippet. Tested on Confluent Cloud, from my desktop with 100 Mbps connection.

          public static void main(String[] args) throws IOException {
              byte[] blob = new byte[262144];
              Properties properties = new Properties();
              properties.load(new FileReader("kafka-staging.properties"));
              properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
              properties.setProperty("request.timeout.ms", "5000");
              properties.setProperty("delivery.timeout.ms", "5000");
              KafkaProducer<String, byte[]> producer = new KafkaProducer(properties);
              while (true) {
                  ProducerRecord<String, byte[]> record = new ProducerRecord<>("alex-test-valid-data", blob);
                  producer.send(record, new Callback() {
                      @Override
                      public void onCompletion(RecordMetadata metadata, Exception exception) {
                          if (exception != null) {
                              System.err.println(exception);
                              long start = System.currentTimeMillis();
                              ProducerRecord<String, byte[]> record = new ProducerRecord<>("alex-test-errors", blob);
                              producer.send(record);  // blocking caused by metadata update
                              long timeElapsed = System.currentTimeMillis() - start;
                              System.err.println("time spent blocking IO thread: " + timeElapsed);
                          }
                      }
                  });
              }
          }
      
      [2020-07-31 14:35:51,936: INFO/main] (AbstractConfig.java:347) - ProducerConfig values: 
      	acks = 1
      	batch.size = 16384
      	bootstrap.servers = [pkc-l915e.europe-west1.gcp.confluent.cloud:9092]
      	buffer.memory = 33554432
      	client.dns.lookup = default
      	client.id = 
      	compression.type = none
      	connections.max.idle.ms = 540000
      	delivery.timeout.ms = 5000
      	enable.idempotence = false
      	interceptor.classes = []
      	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
      	linger.ms = 0
      	max.block.ms = 60000
      	max.in.flight.requests.per.connection = 5
      	max.request.size = 1048576
      	metadata.max.age.ms = 300000
      	metric.reporters = []
      	metrics.num.samples = 2
      	metrics.recording.level = INFO
      	metrics.sample.window.ms = 30000
      	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
      	receive.buffer.bytes = 32768
      	reconnect.backoff.max.ms = 1000
      	reconnect.backoff.ms = 50
      	request.timeout.ms = 5000
      	retries = 2147483647
      	retry.backoff.ms = 100
      	sasl.client.callback.handler.class = null
      	sasl.jaas.config = [hidden]
      	sasl.kerberos.kinit.cmd = /usr/bin/kinit
      	sasl.kerberos.min.time.before.relogin = 60000
      	sasl.kerberos.service.name = null
      	sasl.kerberos.ticket.renew.jitter = 0.05
      	sasl.kerberos.ticket.renew.window.factor = 0.8
      	sasl.login.callback.handler.class = null
      	sasl.login.class = null
      	sasl.login.refresh.buffer.seconds = 300
      	sasl.login.refresh.min.period.seconds = 60
      	sasl.login.refresh.window.factor = 0.8
      	sasl.login.refresh.window.jitter = 0.05
      	sasl.mechanism = PLAIN
      	security.protocol = SASL_SSL
      	security.providers = null
      	send.buffer.bytes = 131072
      	ssl.cipher.suites = null
      	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
      	ssl.endpoint.identification.algorithm = https
      	ssl.key.password = null
      	ssl.keymanager.algorithm = SunX509
      	ssl.keystore.location = null
      	ssl.keystore.password = null
      	ssl.keystore.type = JKS
      	ssl.protocol = TLS
      	ssl.provider = null
      	ssl.secure.random.implementation = null
      	ssl.trustmanager.algorithm = PKIX
      	ssl.truststore.location = null
      	ssl.truststore.password = null
      	ssl.truststore.type = JKS
      	transaction.timeout.ms = 60000
      	transactional.id = null
      	value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
      
      [2020-07-31 14:35:52,099: INFO/main] (AbstractLogin.java:61) - Successfully logged in.
      [2020-07-31 14:35:52,291: INFO/main] (AppInfoParser.java:117) - Kafka version: 5.4.0-ccs
      [2020-07-31 14:35:52,291: INFO/main] (AppInfoParser.java:118) - Kafka commitId: f4201a82bea68cc7
      [2020-07-31 14:35:52,291: INFO/main] (AppInfoParser.java:119) - Kafka startTimeMs: 1596198952287
      [2020-07-31 14:35:52,853: INFO/kafka-producer-network-thread | producer-1] (Metadata.java:261) - [Producer clientId=producer-1] Cluster ID: lkc-43m2m
      org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for alex-test-valid-data-0:5001 ms has passed since batch creation
      org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.
      time spent blocking IO thread: 60001
      org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
      time spent blocking IO thread: 60002
      time spent blocking IO thread: 60017
      org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
      [2020-07-31 14:38:07,219: WARN/kafka-producer-network-thread | producer-1] (Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition alex-test-valid-data-3 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now
      org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.
      time spent blocking IO thread: 60003
      [2020-07-31 14:39:07,223: WARN/kafka-producer-network-thread | producer-1] (Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition alex-test-valid-data-0 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now
      org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
      time spent blocking IO thread: 60002
      time spent blocking IO thread: 60001
      [2020-07-31 14:40:07,224: WARN/kafka-producer-network-thread | producer-1] (Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition alex-test-valid-data-5 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now
      org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
      org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.
      time spent blocking IO thread: 60001
      [2020-07-31 14:41:07,225: WARN/kafka-producer-network-thread | producer-1] (Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition alex-test-valid-data-1 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now
      org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
      time spent blocking IO thread: 60004
      time spent blocking IO thread: 60004
      [2020-07-31 14:42:07,229: WARN/kafka-producer-network-thread | producer-1] (Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition alex-test-valid-data-4 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now
      org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
      org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.
      time spent blocking IO thread: 60000
      [2020-07-31 14:43:07,229: WARN/kafka-producer-network-thread | producer-1] (Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition alex-test-valid-data-2 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now
      org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for alex-test-valid-data-5:422600 ms has passed since batch creation
      time spent blocking IO thread: 60003
      time spent blocking IO thread: 60001
      org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for alex-test-valid-data-5:422490 ms has passed since batch creation
      org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.
      time spent blocking IO thread: 60002
      org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for alex-test-valid-data-5:422315 ms has passed since batch creation
      time spent blocking IO thread: 60003
      time spent blocking IO thread: 60003
      org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for alex-test-valid-data-5:422124 ms has passed since batch creation
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            sibiryakov Alexander Sibiryakov
            Votes:
            1 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: