Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8412

Still a nullpointer exception thrown on shutdown while flushing before closing producers

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.1.1
    • 2.1.2, 2.2.2, 2.4.0, 2.3.1
    • streams
    • None

    Description

      I found a closed issue and replied there but decided to open one myself because although they're related they're slightly different. The original issue is at https://issues.apache.org/jira/browse/KAFKA-7678

      The fix there has been to implement a null check around closing a producer because in some cases the producer is already null there (has been closed already)

      In version 2.1.1 we are getting a very similar exception, but in the 'flush' method that is called pre-close. This is in the log:

      message: stream-thread [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed while closing StreamTask 1_26 due to the following error:
      logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks
      
      java.lang.NullPointerException: null
          at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
          at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
          at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
          at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
          at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
          at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
          at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
          at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
          at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758)

      Followed by:
       

      message: task [1_26] Could not close task due to the following error:
      logger_name: org.apache.kafka.streams.processor.internals.StreamTask
      
      java.lang.NullPointerException: null
          at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
          at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
          at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
          at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
          at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
          at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
          at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
          at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
          at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758)

      If I look at the source code at this point, I see a nice null check in the close method, but not in the flush method that is called just before that:

      public void flush() {
          this.log.debug("Flushing producer");
          this.producer.flush();
          this.checkForException();
      }
      
      public void close() {
          this.log.debug("Closing producer");
          if (this.producer != null) {
              this.producer.close();
              this.producer = null;
          }
      
          this.checkForException();
      }

      Seems to my (ignorant) eye that the flush method should also be wrapped in a null check in the same way as has been done for close.

      Attachments

        Issue Links

          Activity

            Thanks for reporting this. What I don't understand is, why we would flush after we closed a task already. Hence, I am not sure if a null-guard is the correct fix, but to rather make sure we don't call flush() in the first place.

            Can you maybe provide debug level logs? This might help to understand the scenario better.

            mjsax Matthias J. Sax added a comment - Thanks for reporting this. What I don't understand is, why we would flush after we closed a task already. Hence, I am not sure if a null-guard is the correct fix, but to rather make sure we don't call flush() in the first place. Can you maybe provide debug level logs? This might help to understand the scenario better.
            Sebastiaan83 Sebastiaan added a comment -

            mjsax I can try to reproduce it in development some more but so far we've only seen it in production.

            But my theory is that it is similar to the other ticket, a comment https://issues.apache.org/jira/browse/KAFKA-7678?focusedCommentId=16715220&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16715220 says:

            "There are one or two edge cases which can cause record collector to be closed multiple times, we have noticed them recently and are thinking about cleanup the classes along the calling hierarchy (i.e. from Task Manager -> Task -> RecordCollector) for it. One example is:

            1) a task is suspended, with EOS turned on (like your case), the record collector is closed().
            2) then the instance got killed (SIGTERM) , which causes all threads to be closed, which will then cause all their owned tasks to be closed. The same record collector close() call will be triggered again"

             

             

            So this could be the same issue but now not for close but for flush. The producer is already flushed and closed but the same thing is tried again. Of course I don't know anything about the internals of the client so take this with a grain of salt.

            Sebastiaan83 Sebastiaan added a comment - mjsax I can try to reproduce it in development some more but so far we've only seen it in production. But my theory is that it is similar to the other ticket, a comment https://issues.apache.org/jira/browse/KAFKA-7678?focusedCommentId=16715220&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16715220 says: "There are one or two edge cases which can cause record collector to be closed multiple times, we have noticed them recently and are thinking about cleanup the classes along the calling hierarchy (i.e. from Task Manager -> Task -> RecordCollector) for it. One example is: 1) a task is suspended , with EOS turned on (like your case), the record collector is closed(). 2) then the instance got killed (SIGTERM) , which causes all threads to be closed, which will then cause all their owned tasks to be closed . The same record collector close() call will be triggered again"     So this could be the same issue but now not for close but for flush. The producer is already flushed and closed but the same thing is tried again. Of course I don't know anything about the internals of the client so take this with a grain of salt.

            Do you run with EOS enabled?

            Also, closing() and flushing() is a little different... I am not saying there is no issue, I just try to figure out what the correct fix is. Just adding a `null`-check could actually just mask the root cause of the bug, but not fix the bug itself.

            mjsax Matthias J. Sax added a comment - Do you run with EOS enabled? Also, closing() and flushing() is a little different... I am not saying there is no issue, I just try to figure out what the correct fix is. Just adding a `null`-check could actually just mask the root cause of the bug, but not fix the bug itself.
            Sebastiaan83 Sebastiaan added a comment -

            mjsax yes we are using EOS.

            And yeah you're right, I was surprised it was solved by a null check for the other ticket when it also seemed to me the situation should be avoided in the first place. But I'll leave that to the actual developers.

            Sebastiaan83 Sebastiaan added a comment - mjsax yes we are using EOS. And yeah you're right, I was surprised it was solved by a null check for the other ticket when it also seemed to me the situation should be avoided in the first place. But I'll leave that to the actual developers.
            guozhang Guozhang Wang added a comment -

            mjsax I looked through the code and the original JIRA (https://issues.apache.org/jira/browse/KAFKA-7285) again, and I think the above case I mentioned still exists, where the root cause is that in `suspend` we would close the record collector with EOS turned on while in `close` we may want to flush (from state commit), and close the collector again.

            What I'm thinking is that, we can refactor the fix in KAFKA-7285, such that in suspend, we do the "close-and-then-recreate" completely and only leave the `initTxn` call in the `resume` function. In that case whenever we're closing we are assured there's still an open producer. With this, we no longer need the closed-check in close() function as well.

            guozhang Guozhang Wang added a comment - mjsax I looked through the code and the original JIRA ( https://issues.apache.org/jira/browse/KAFKA-7285 ) again, and I think the above case I mentioned still exists, where the root cause is that in `suspend` we would close the record collector with EOS turned on while in `close` we may want to flush (from state commit), and close the collector again. What I'm thinking is that, we can refactor the fix in KAFKA-7285 , such that in suspend, we do the "close-and-then-recreate" completely and only leave the `initTxn` call in the `resume` function. In that case whenever we're closing we are assured there's still an open producer. With this, we no longer need the closed-check in close() function as well.

            Just cycling back to this.

            I am not sure if we should do the "close-and-then-recreate" strategy. The issues seems to be, that we blindly call `task#close()` for all tasks within `AssingedTasks#close()`. However, it seems that a proper fix would be to distinguish between active and suspended tasks, and call `closeSuspended()` for suspended ones, and `close()` for all others?

            With regard to KAFKA-7678: re-reading the ticket, the report says, that it should be a graceful shutdown via SIGTERM. Hence, I am wondering why the unclean shutdown path is actually executed (ie, why do we call `maybeAbortTransactionAndCloseRecordCollector()` – this should only happen for an unclean shutdown). \cc pachilo who reported and work on the first NPE

            mjsax Matthias J. Sax added a comment - Just cycling back to this. I am not sure if we should do the "close-and-then-recreate" strategy. The issues seems to be, that we blindly call `task#close()` for all tasks within `AssingedTasks#close()`. However, it seems that a proper fix would be to distinguish between active and suspended tasks, and call `closeSuspended()` for suspended ones, and `close()` for all others? With regard to  KAFKA-7678 : re-reading the ticket, the report says, that it should be a graceful shutdown via SIGTERM. Hence, I am wondering why the unclean shutdown path is actually executed (ie, why do we call `maybeAbortTransactionAndCloseRecordCollector()` – this should only happen for an unclean shutdown). \cc pachilo who reported and work on the first NPE

            guozhang also mentioned that we may need to refactor that part of the code  in a comment on the KAFKA-7678 , wich is aligned with your proposal mjsax.
            In your opinion mjsax, should we address that or first we could add a null check before calling the Producer#flush method? to then do the refactor of course.
             

            pachilo Jonathan Santilli added a comment - guozhang also mentioned that we may need to refactor that part of the code   in a comment on the KAFKA-7678 , wich is aligned with your proposal mjsax . In your opinion mjsax , should we address that or first we could add a null check before calling the Producer#flush method? to then do the refactor of course.  

            I would rather refactor the code directly because it seems to be cleaner. WDYT guozhang?

            mjsax Matthias J. Sax added a comment - I would rather refactor the code directly because it seems to be cleaner. WDYT guozhang ?
            cpettitt-confluent Chris Pettitt added a comment -

            Matthias, Guozhang proposed I pick up this ticket to get my feet wet. I hope you don't mind, but feel free to grab it back if you are very attached to it .

            cpettitt-confluent Chris Pettitt added a comment - Matthias, Guozhang proposed I pick up this ticket to get my feet wet. I hope you don't mind, but feel free to grab it back if you are very attached to it .

            Sure. I did not start yet to work on a PR anyway.

            mjsax Matthias J. Sax added a comment - Sure. I did not start yet to work on a PR anyway.
            cpettitt-confluent Chris Pettitt added a comment -

            I'm able to repro this and mjsax's solution should fix this.

            One other observation while I was in this code is that we essentially have state spread out across classes. As a dev new to the code, I would expect to push most of the state for the task down into StreamTask and only if we need fast lookup by state keep an index of these in AssignedTask, but treat this as an index and not as the authoritative state. In other words, I would prefer to see close on StreamTask do the right thing for the state it is in instead of having AssignedTasks be responsible for different types of close. This should also make it easier to test without involving mocks, as we are in AssignedTasks.

            So I see a few options:

            1. Use mjsax's solution and keep state as is.
            2. Push state down into StreamTask, AssignedTasks just calls close as it does today.
            3. Do #1 in first patch and follow up with #2 in a second patch focused on refactoring state.

            mjsax guozhang thoughts?

            cpettitt-confluent Chris Pettitt added a comment - I'm able to repro this and mjsax 's solution should fix this. One other observation while I was in this code is that we essentially have state spread out across classes. As a dev new to the code, I would expect to push most of the state for the task down into StreamTask and only if we need fast lookup by state keep an index of these in AssignedTask, but treat this as an index and not as the authoritative state. In other words, I would prefer to see close on StreamTask do the right thing for the state it is in instead of having AssignedTasks be responsible for different types of close. This should also make it easier to test without involving mocks, as we are in AssignedTasks. So I see a few options: Use mjsax 's solution and keep state as is. Push state down into StreamTask, AssignedTasks just calls close as it does today. Do #1 in first patch and follow up with #2 in a second patch focused on refactoring state. mjsax guozhang thoughts?
            cpettitt-confluent Chris Pettitt added a comment -

            Having played with this a bit more I think its best to go for #1 for now, which is a point fix with a pretty small scope, i.e. low risk.

            I do see value in rethinking state a bit particularly around ownership and transitions, perhaps as a separate ticket. As someone new coming to the code it is difficult to work out which states and transitions are valid at a glance, especially because it is distributed across at least AssignedTasks, AbstractTask, and StreamTask.

            cpettitt-confluent Chris Pettitt added a comment - Having played with this a bit more I think its best to go for #1 for now, which is a point fix with a pretty small scope, i.e. low risk. I do see value in rethinking state a bit particularly around ownership and transitions, perhaps as a separate ticket. As someone new coming to the code it is difficult to work out which states and transitions are valid at a glance, especially because it is distributed across at least AssignedTasks, AbstractTask, and StreamTask.
            githubbot ASF GitHub Bot added a comment -

            cpettitt-confluent commented on pull request #7207: KAFKA-8412[WIP]: Still a nullpointer exception thrown on shutdown whi…
            URL: https://github.com/apache/kafka/pull/7207

            …le flushing before closing producers

            Prior to this change an NPE is raised when calling AssignedTasks.close
            under the following conditions:

            1. EOS is enabled
            2. The task was in a suspended state

            The cause for the NPE is that when a clean close is requested for a
            StreamTask the StreamTask tries to commit. However, in the suspended
            state there is no producer so ultimately an NPE is thrown for the
            contained RecordCollector in flush.

            It is my opinion that in the long term, this (and probably other
            surprising state interactions) could be cleaned up by consolidating
            state into one place instead of spreading it across AssignedTasks,
            StreamTask, and AbstractTask. However, that is a much larger, more risky
            change, and this issue is currently considered minor.

            The fix put forth in this commit is to have AssignedTasks call
            closeSuspended when it knows the underlying StreamTask is suspended.

            Currently the only externally visible way to detect this problem in test
            seems to be via logging. This is because the NPE is logged but then
            suppressed under the following sequence:

            RecordCollectorImpl.flush:266

            • throws NPE (producer is null)

            StreamTask.suspend:578

            • goes through the finally block and then reraises the NPE

            StreamTask.close:706

            • catches the NPE, calls closeSuspended with the NPE

            StreamTask.closeSuspended:676

            • rethrows the NPE after some cleanup

            AssignedTasks.close:341

            • catches and logs the exception
            • tries a "dirty" close (clean = true) which succeeds
            • firstException is NOT set because the test `!closeUnclean(task)`
              does not hold.

            It seems this is not the intended behavior? If so, I will happily
            correct that and stop using logging as a way to detect failure.

            Otherwise this commit does not currently pass checkstyle because I'm
            using blacklisted imports: `LogCaptureAppender` and its various
            dependencies from `log4j`. I would appreciate guidance as to whether we
            should whitelist these or use another technique for detection.

            Note also that this test is quite involved. I could have just tested
            that AssignedTasks calls closeSuspended when appropriate, but that is
            testing, IMO, a detail of the implementation and doesn't actually verify
            we reproduced the original problem as it was described. I feel much more
            confident that we are reproducing the behavior - and we can test exactly
            the conditions that lead to it - when testing across AssignedTasks and
            StreamTask. I believe this is an additional support for the argument of
            eventually consolidating the state split across classes.

            *More detailed description of your change,
            if necessary. The PR title and PR message become
            the squashed commit message, so use a separate
            comment to ping reviewers.*

            *Summary of testing strategy (including rationale)
            for the feature or bug fix. Unit and/or integration
            tests are expected for any behaviour change and
            system tests should be considered for larger changes.*

                1. Committer Checklist (excluded from commit message)
            • [ ] Verify design and implementation
            • [ ] Verify test coverage and CI build status
            • [ ] Verify documentation (including upgrade notes)

            ----------------------------------------------------------------
            This is an automated message from the Apache Git Service.
            To respond to the message, please log on to GitHub and use the
            URL above to go to the specific comment.

            For queries about this service, please contact Infrastructure at:
            users@infra.apache.org

            githubbot ASF GitHub Bot added a comment - cpettitt-confluent commented on pull request #7207: KAFKA-8412 [WIP] : Still a nullpointer exception thrown on shutdown whi… URL: https://github.com/apache/kafka/pull/7207 …le flushing before closing producers Prior to this change an NPE is raised when calling AssignedTasks.close under the following conditions: 1. EOS is enabled 2. The task was in a suspended state The cause for the NPE is that when a clean close is requested for a StreamTask the StreamTask tries to commit. However, in the suspended state there is no producer so ultimately an NPE is thrown for the contained RecordCollector in flush. It is my opinion that in the long term, this (and probably other surprising state interactions) could be cleaned up by consolidating state into one place instead of spreading it across AssignedTasks, StreamTask, and AbstractTask. However, that is a much larger, more risky change, and this issue is currently considered minor. The fix put forth in this commit is to have AssignedTasks call closeSuspended when it knows the underlying StreamTask is suspended. Currently the only externally visible way to detect this problem in test seems to be via logging. This is because the NPE is logged but then suppressed under the following sequence: RecordCollectorImpl.flush:266 throws NPE (producer is null) StreamTask.suspend:578 goes through the finally block and then reraises the NPE StreamTask.close:706 catches the NPE, calls closeSuspended with the NPE StreamTask.closeSuspended:676 rethrows the NPE after some cleanup AssignedTasks.close:341 catches and logs the exception tries a "dirty" close (clean = true) which succeeds firstException is NOT set because the test `!closeUnclean(task)` does not hold. It seems this is not the intended behavior? If so, I will happily correct that and stop using logging as a way to detect failure. Otherwise this commit does not currently pass checkstyle because I'm using blacklisted imports: `LogCaptureAppender` and its various dependencies from `log4j`. I would appreciate guidance as to whether we should whitelist these or use another technique for detection. Note also that this test is quite involved. I could have just tested that AssignedTasks calls closeSuspended when appropriate, but that is testing, IMO, a detail of the implementation and doesn't actually verify we reproduced the original problem as it was described. I feel much more confident that we are reproducing the behavior - and we can test exactly the conditions that lead to it - when testing across AssignedTasks and StreamTask. I believe this is an additional support for the argument of eventually consolidating the state split across classes. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* Committer Checklist (excluded from commit message) [ ] Verify design and implementation [ ] Verify test coverage and CI build status [ ] Verify documentation (including upgrade notes) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
            githubbot ASF GitHub Bot added a comment -

            guozhangwang commented on pull request #7207: KAFKA-8412: Still a nullpointer exception thrown on shutdown whi…
            URL: https://github.com/apache/kafka/pull/7207

            ----------------------------------------------------------------
            This is an automated message from the Apache Git Service.
            To respond to the message, please log on to GitHub and use the
            URL above to go to the specific comment.

            For queries about this service, please contact Infrastructure at:
            users@infra.apache.org

            githubbot ASF GitHub Bot added a comment - guozhangwang commented on pull request #7207: KAFKA-8412 : Still a nullpointer exception thrown on shutdown whi… URL: https://github.com/apache/kafka/pull/7207 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org

            People

              cpettitt-confluent Chris Pettitt
              Sebastiaan83 Sebastiaan
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: