Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-3938 Fix consumer session timeout issue in Kafka Streams
  3. KAFKA-3758

KStream job fails to recover after Kafka broker stopped

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.10.0.0
    • 0.11.0.1
    • streams
    • None

    Description

      We've been doing some testing of a fairly complex KStreams job and under load it seems the job fails to rebalance + recover if we shut down one of the kafka brokers. The test we were running had a 3-node kafka cluster where each topic had at least a replication factor of 2, and we terminated one of the nodes.

      Attached is the full log, the root exception seems to be contention on the lock on the state directory. The job continues to try to recover but throws errors relating to locks over and over. Restarting the job itself resolves the problem.

      1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager
      1703 at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
      1704 at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
      1705 at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
      1706 at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
      1707 at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
      1708 at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
      1709 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
      1710 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
      1711 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
      1712 at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
      1713 at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
      1714 at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
      1715 at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
      1716 at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
      1717 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
      1718 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
      1719 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
      1720 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
      1721 at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
      1722 at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
      1723 at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
      1724 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
      1725 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
      1726 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
      1727 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
      1728 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
      1729 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
      1730 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
      1731 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
      1732 at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
      1733 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
      1734 at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
      1735 at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
      1736 Caused by: java.io.IOException: Failed to lock the state directory: /muon/state/job-stream_photon_messages-1/2_82
      1737 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
      1738 at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
      1739 ... 32 more

      Attachments

        1. muon.log.1.gz
          300 kB
          Greg Fodor

        Issue Links

          Activity

            People

              enothereska Eno Thereska
              gfodor Greg Fodor
              Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: