Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7678

Failed to close producer due to java.lang.NullPointerException

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 1.1.1, 2.0.1, 2.1.0
    • Fix Version/s: 1.1.2, 2.2.0, 2.1.1, 2.0.2
    • Component/s: streams
    • Labels:
      None

      Description

      This occurs when the group is rebalancing in a Kafka Stream application and the process (the Kafka Stream application) receives a SIGTERM to stop it gracefully.

       

       

      ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] Failed to close producer due to the following error:
      java.lang.NullPointerException
       at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
       at org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
       at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
       at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
       at org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
       at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
       at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
       at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
       at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758)

       

       

      Although I have checked the code and the method `maybeAbortTransactionAndCloseRecordCollector` in the `StreamTask.java` class is expecting any kind of error to happen since is catching `Throwable`.

       

       

       

      try {
       recordCollector.close();
      } catch (final Throwable e) {
       log.error("Failed to close producer due to the following error:", e);
      } finally {
       producer = null;
      }

       

      Should we consider this a bug?

      In my opinion, we could check for the `null` possibility at `RecordCollectorImpl.java` class:

      @Override
      public void close() {
       log.debug("Closing producer");
       producer.close();
       producer = null;
       checkForException();
      }

       

      Change it for:

       

      @Override
      public void close() {
       log.debug("Closing producer");
       if ( Objects.nonNull(producer) ) {
          producer.close();
          producer = null;
       }
       checkForException();
      }

       

      How does that sound?

       

      Kafka Brokers running 2.0.0

      Kafka Stream and client 2.1.0

      OpenJDK 8

       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                pachilo Jonathan Santilli
                Reporter:
                pachilo Jonathan Santilli
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: