Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4787

KafkaStreams close() is not reentrant

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 0.10.2.0
    • None
    • streams
    • None

    Description

      While building a simple application, I tried to implement a failure policy where any uncaught exception terminates the application until an administrator can evaluate and intervene:

          /** Handle any uncaught exception by shutting down the program. */
          private void handleStreamException(Thread thread, Throwable t) {
              LOG.error("stream exception in thread {}", thread, t);
              streams.close();
          }
      
          streams.setUncaughtExceptionHandler(this::handleStreamException);
          streams.start();
      

      Unfortunately, because the KafkaStreams#close() method takes a lock, this is prone to what looks like a deadlock:

      "StreamThread-1" #80 prio=5 os_prio=0 tid=0x00007f56096f4000 nid=0x40c8 waiting for monitor entry [0x00007f54f03ee000]
         java.lang.Thread.State: BLOCKED (on object monitor)
              at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java)
              - waiting to lock <0x00000000f171cda8> (a org.apache.kafka.streams.KafkaStreams)
              at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438)
              at com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown Source)
              at com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212)
              at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207)
              at com.opentable.chat.service.ChatStorage.handleStreamException(ChatStorage.java:541)
              at com.opentable.chat.service.ChatStorage$$Lambda$123/149062221.uncaughtException(Unknown Source)
              at java.lang.Thread.dispatchUncaughtException(Thread.java:1956)
      
      "main" #1 prio=5 os_prio=0 tid=0x00007f5608011000 nid=0x3f76 in Object.wait() [0x00007f5610f04000]
         java.lang.Thread.State: WAITING (on object monitor)
              at java.lang.Object.wait(Native Method)
              at java.lang.Thread.join(Thread.java:1249)
              - locked <0x00000000fd302bf0> (a java.lang.Thread)
              at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:494)
              - locked <0x00000000f171cda8> (a org.apache.kafka.streams.KafkaStreams)
              at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438)
              at com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown Source)
              at com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212)
              at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207)
      

      Note how the main thread calls close(), which encounters an exception. It uses a StreamThread to dispatch to the handler, which calls close(). Once it tries to take the monitor, we are left in a position where main is joined on StreamThread-1, but StreamThread-1 is waiting for main to release that monitor.

      Arguably it's a bit abusive to call close() in this way (it certainly wasn't intentional) – but to make Kafka Streams robust it should handle any sequence of close() invocations in particular gracefully.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              stevenschlansker Steven Schlansker
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: