Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-17025

KAFKA-17025: Producer throws uncaught exception in the io thread.

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.6.2
    • None
    • clients
    • None

    Description

      When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do nothing:

       

      ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: Direct buffer memory .....
      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
      at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) 
      at org.apache.kafka.clients.producer.internals.Sender.run 
      at java.Lang.Thread.run
      

       

       

      I try to find what happens:

      1. It seems that OutOfMemoryError as a Error is not captured when org.apache.kafka.clients.producer.internals.Sender#run try to catch a Exception: 

      @Override
      public void run() {
          log.debug("Starting Kafka producer I/O thread.");
      
          // main loop, runs until close is called
          while (running) {
              try {
                  runOnce();
              } catch (Exception e) {
                  log.error("Uncaught error in kafka producer I/O thread: ", e);
              }
          }
      
          log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
      
          // okay we stopped accepting requests but there may still be
          // requests in the transaction manager, accumulator or waiting for acknowledgment,
          // wait until these are completed.
          while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
              try {
                  runOnce();
              } catch (Exception e) {
                  log.error("Uncaught error in kafka producer I/O thread: ", e);
              }
          }
      
          // Abort the transaction if any commit or abort didn't go through the transaction manager's queue
          while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) {
              if (!transactionManager.isCompleting()) {
                  log.info("Aborting incomplete transaction due to shutdown");
                  transactionManager.beginAbort();
              }
              try {
                  runOnce();
              } catch (Exception e) {
                  log.error("Uncaught error in kafka producer I/O thread: ", e);
              }
          }
      
          if (forceClose) {
              // We need to fail all the incomplete transactional requests and batches and wake up the threads waiting on
              // the futures.
              if (transactionManager != null) {
                  log.debug("Aborting incomplete transactional requests due to forced shutdown");
                  transactionManager.close();
              }
              log.debug("Aborting incomplete batches due to forced shutdown");
              this.accumulator.abortIncompleteBatches();
          }
          try {
              this.client.close();
          } catch (Exception e) {
              log.error("Failed to close network client", e);
          }
      
          log.debug("Shutdown of Kafka producer I/O thread has completed.");
      }
      
      
      

       

      2. Then KafkaThread catch uncaught exception and just log it:

      public KafkaThread(final String name, Runnable runnable, boolean daemon) {
          super(runnable, name);
          configureThread(name, daemon);
      }
      
      private void configureThread(final String name, boolean daemon) {
          setDaemon(daemon);
          setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in thread '{}':", name, e));
      }

       

      To be honest, I don't understand why KafkaThread doing nothing but log it when an uncaught exception occurs? Why not exposing method to set setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can determine what to do with uncaught exception, no matter thrown it or just ignore it?

      Attachments

        Issue Links

          Activity

            People

              loserwang1024 Hongshun Wang
              loserwang1024 Hongshun Wang
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated: