Details
Description
During rebalance processor node's close() method gets called two times once from StreamThread.suspendTasksAndState() and once from StreamThread.closeNonAssignedSuspendedTasks(). I have some instance filed which I am closing in processor's close method. This instance's close method throws some exception if I call close more than once. Because of this exception, the Kafka streams does not attempt to close the statemanager ie. task.closeStateManager(true) is never called. When a task moves from one thread to another within same machine the task blocks trying to get lock on state directory which is still held by unclosed statemanager and keep throwing the below warning message:
2017-04-30 12:34:17 WARN StreamThread:1214 - Could not create task 0_1. Will retry.
org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the state directory for task 0_1
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
Attachments
Attachments
Issue Links
- is duplicated by
-
KAFKA-5070 org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the state directory: /opt/rocksdb/pulse10/0_18
- Resolved
- is part of
-
KAFKA-5485 Streams should not suspend tasks twice
- Resolved
- relates to
-
KAFKA-5397 streams are not recovering from LockException during rebalancing
- Resolved
-
KAFKA-5070 org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the state directory: /opt/rocksdb/pulse10/0_18
- Resolved
- links to