Description
We've been doing some testing of a fairly complex KStreams job and under load it seems the job fails to rebalance + recover if we shut down one of the kafka brokers. The test we were running had a 3-node kafka cluster where each topic had at least a replication factor of 2, and we terminated one of the nodes.
Attached is the full log, the root exception seems to be contention on the lock on the state directory. The job continues to try to recover but throws errors relating to locks over and over. Restarting the job itself resolves the problem.
1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager
1703 at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
1704 at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
1705 at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
1706 at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
1707 at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
1708 at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
1709 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
1710 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
1711 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
1712 at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
1713 at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
1714 at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
1715 at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
1716 at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
1717 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
1718 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
1719 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
1720 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
1721 at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
1722 at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
1723 at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
1724 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
1725 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
1726 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
1727 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
1728 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
1729 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
1730 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
1731 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
1732 at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
1733 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
1734 at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
1735 at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
1736 Caused by: java.io.IOException: Failed to lock the state directory: /muon/state/job-stream_photon_messages-1/2_82
1737 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
1738 at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
1739 ... 32 more
Attachments
Attachments
Issue Links
- is related to
-
KAFKA-4916 Add streams tests with brokers failing
- Resolved