Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-5167

streams task gets stuck after re-balance due to LockException

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.10.2.0, 0.10.2.1, 0.11.0.0
    • 0.10.2.2, 0.11.0.1, 1.0.0
    • streams
    • None

    Description

      During rebalance processor node's close() method gets called two times once from StreamThread.suspendTasksAndState() and once from StreamThread.closeNonAssignedSuspendedTasks(). I have some instance filed which I am closing in processor's close method. This instance's close method throws some exception if I call close more than once. Because of this exception, the Kafka streams does not attempt to close the statemanager ie. task.closeStateManager(true) is never called. When a task moves from one thread to another within same machine the task blocks trying to get lock on state directory which is still held by unclosed statemanager and keep throwing the below warning message:

      2017-04-30 12:34:17 WARN StreamThread:1214 - Could not create task 0_1. Will retry.
      org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the state directory for task 0_1
      at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
      at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
      at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
      at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
      at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
      at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
      at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
      at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
      at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
      at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
      at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
      at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
      at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
      at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

      Attachments

        1. logs.txt
          105 kB
          Narendra Kumar
        2. DebugTransformer.java
          2 kB
          Narendra Kumar
        3. BugTest.java
          2 kB
          Narendra Kumar

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            mjsax Matthias J. Sax
            Narendra Kumar Narendra Kumar
            Votes:
            3 Vote for this issue
            Watchers:
            10 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment