Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8062

StateListener is not notified when StreamThread dies

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.1.1
    • 2.3.0, 2.2.1
    • streams
    • None
    • Kafka 2.1.1, kafka-streams-scala version 2.1.1

    Description

      I want my application to react when streams die. Trying to use KafkaStreams.setStateListener. Also checking KafkaStreams.state() from time to time.

      The test scenario: Kafka is available, but there are no topics that my Topology is supposed to use.

      I expect streams to crash and the state listener to be notified about that, with the new state ERROR. KafkaStreams.state() should also return ERROR.

      In reality the streams crash, but the KafkaStreams.state() method always returns REBALANCING and the last time the StateListener was called, the new state was also REBALANCING. 

       

      I believe the reason for this is in the methods:

      org.apache.kafka.streams.KafkaStreams.StreamStateListener.onChange() which does not react on the state StreamsThread.State.PENDING_SHUTDOWN

      and

      org.apache.kafka.streams.processor.internals.StreamThread.RebalanceListener.onPartitionsAssigned, which calls shutdown() setting the state to PENDING_SHUTDOWN and then

      streamThread.setStateListener(null) effectively removing the state listener, so that the DEAD state of the thread never reaches KafkaStreams object.

      Here is an extract from the logs:

      14:57:03.272 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] ERROR o.a.k.s.p.i.StreamsPartitionAssignor - stream-thread [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-consumer] test-input-topic is unknown yet during rebalance, please make sure they have been pre-created before starting the Streams application.
      14:57:03.283 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-consumer, groupId=Test] Successfully joined group with generation 1
      14:57:03.284 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-consumer, groupId=Test] Setting newly assigned partitions []
      14:57:03.285 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO o.a.k.s.p.i.StreamThread - stream-thread [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] Informed to shut down
      14:57:03.285 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO o.a.k.s.p.i.StreamThread - stream-thread [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] State transition from PARTITIONS_REVOKED to PENDING_SHUTDOWN
      14:57:03.316 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO o.a.k.s.p.i.StreamThread - stream-thread [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] Shutting down
      14:57:03.317 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO o.a.k.c.c.KafkaConsumer - [Consumer clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-restore-consumer, groupId=] Unsubscribed all topics or patterns and assigned partitions
      14:57:03.317 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO o.a.k.c.p.KafkaProducer - [Producer clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
      14:57:03.332 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO o.a.k.s.p.i.StreamThread - stream-thread [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
      14:57:03.332 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO o.a.k.s.p.i.StreamThread - stream-thread [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] Shutdown complete

      After this calls to KafkaStreams.state() still return REBALANCING

      There is a workaround with requesting KafkaStreams.localThreadsMetadata() and checking each thread's state manually, but that seems very wrong.

      Attachments

        Issue Links

          Activity

            People

              guozhang Guozhang Wang
              andrey.v.volkov Andrey Volkov
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: