Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-3938 Fix consumer session timeout issue in Kafka Streams
  3. KAFKA-3752

Provide a way for KStreams to recover from unclean shutdown

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.10.0.0
    • 0.10.1.0
    • streams

    Description

      If a KStream application gets killed with SIGKILL (e.g. by the Linux OOM Killer), it may leave behind lock files and fail to recover.

      It would be useful to have an options (say --force) to tell KStreams to proceed even if it finds old LOCK files.

      [2016-05-24 17:37:52,886] ERROR Failed to create an active task #0_0 in thread [StreamThread-1]:  (org.apache.kafka.streams.processor.internals.StreamThread:583)
      org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager
      	at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
      	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
      	at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
      	at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
      	at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
      	at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
      	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
      	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
      	at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
      	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
      	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
      	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
      	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
      	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
      	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
      	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
      	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
      Caused by: java.io.IOException: Failed to lock the state directory: /data/test/2/kafka-streams/test-2/0_0
      	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
      	at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
      	... 32 more
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            theduderog Roger Hoover
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: