Details
Description
We are using kafka streams (2.2.0) application for reading messages from source topic do some transformation and send to destination topic. Application started fine and processed messages till it encountered UnknownHostException, after which application is hung in rebalancing state and not processing messages.
Below are the properties we have configured :
application.id = *****
bootstrap.servers = "hostname1:port1,hostname2:port2,hostname3:port3"
num.stream.threads=3
replication.factor=3
num.standby.replicas=1
max.block.ms=86400000
acks=all
auto.offset.reset=earliest
processing.guarantee=exactly_once
Additional details.
Number of brokers - 3
Source topic partition count - 12 and replication factor of 3
Destination topic partition count - 12 and replication factor of 3
4 instances of stream application are deployed in docker containers.
Below are the some of the logs :
[WARN] [streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1] o.a.k.clients.NetworkClient - [Consumer clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1-consumer, groupId=streams-example] Error connecting to node hostname1:port1 (id: 2147438464 rack: null) java.net.UnknownHostException: hostname1 at java.net.InetAddress.getAllByName0(InetAddress.java:1280) at java.net.InetAddress.getAllByName(InetAddress.java:1192) at java.net.InetAddress.getAllByName(InetAddress.java:1126) at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:117) at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.moveToNextAddress(ClusterConnectionStates.java:387) at org.apache.kafka.clients.ClusterConnectionStates.connecting(ClusterConnectionStates.java:121) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:917) at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:548) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:676) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:656) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:235) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:850) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
[ERROR] [kafka-producer-network-thread | streams-example-e3628ed4-d2f5-44c2-a9cd-31de847589f6-StreamThread-1-0_8-producer] o.a.k.c.p.internals.Sender - [Producer clientId=streams-example-e3628ed4-d2f5-44c2-a9cd-31de847589f6-StreamThread-1-0_8-producer, transactionalId=streams-example-0_8] Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for connection 2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:339) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:143) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:926) at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:65) at org.apache.kafka.clients.producer.internals.Sender.maybeSendTransactionalRequest(Sender.java:421) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:286) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235) at java.lang.Thread.run(Thread.java:748)
[WARN] [streams-example-e3628ed4-d2f5-44c2-a9cd-31de847589f6-StreamThread-1] o.a.kafka.clients.ClientUtils - Couldn't resolve server hostname1:port1 from bootstrap.servers as DNS resolution failed for hostname1
[INFO] [kafka-coordinator-heartbeat-thread | streams-example] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-3-consumer, groupId=streams-example] Group coordinator hostname1:port1 (id: 2147438646 rack: null) is unavailable or invalid, will attempt rediscovery [INFO] [kafka-coordinator-heartbeat-thread | streams-example] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-3-consumer, groupId=streams-example] Discovered group coordinator hostname1:port1 (id: 2147438646 rack: null) [INFO] [kafka-coordinator-heartbeat-thread | streams-example] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-3-consumer, groupId=streams-example] Group coordinator hostname1:port1 (id: 2147438646 rack: null) is unavailable or invalid, will attempt rediscovery [INFO] [kafka-coordinator-heartbeat-thread | streams-example] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-3-consumer, groupId=streams-example] Discovered group coordinator hostname1:port1 (id: 2147438646 rack: null)
[INFO] [kafka-coordinator-heartbeat-thread | streams-example] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-2-consumer, groupId=streams-example] Attempt to heartbeat failed since group is rebalancing