Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9250

Kafka streams stuck in rebalancing state after UnknownHostException

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 2.2.0
    • None
    • clients, config, network, streams
    • None

    Description

      We are using kafka streams (2.2.0) application for reading messages from source topic do some transformation and send to destination topic. Application started fine and processed messages till it encountered UnknownHostException, after which application is hung in rebalancing state and not processing messages.

       

      Below are the properties we have configured :

      application.id = *****

      bootstrap.servers = "hostname1:port1,hostname2:port2,hostname3:port3"

      num.stream.threads=3

      replication.factor=3

      num.standby.replicas=1

      max.block.ms=86400000

      acks=all

      auto.offset.reset=earliest

      processing.guarantee=exactly_once

       

      Additional details.

      Number of brokers - 3

      Source topic partition count - 12 and replication factor of 3

      Destination topic partition count - 12 and replication factor of 3

      4 instances of stream application are deployed in docker containers.

       

      Below are the some of the logs :

      [WARN] [streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1] 
      o.a.k.clients.NetworkClient - [Consumer clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1-consumer, groupId=streams-example] Error connecting to node hostname1:port1 (id: 2147438464 rack: null)
      java.net.UnknownHostException: hostname1 
      at java.net.InetAddress.getAllByName0(InetAddress.java:1280) 
      at java.net.InetAddress.getAllByName(InetAddress.java:1192) 
      at java.net.InetAddress.getAllByName(InetAddress.java:1126) 
      at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:117) 
      at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.moveToNextAddress(ClusterConnectionStates.java:387) 
      at org.apache.kafka.clients.ClusterConnectionStates.connecting(ClusterConnectionStates.java:121) 
      at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:917) 
      at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) 
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:548) 
      at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:676) 
      at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:656) 
      at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) 
      at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) 
      at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575) 
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389) 
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) 
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) 
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) 
      at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:235) 
      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317) 
      at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226) 
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191) 
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) 
      at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941) 
      at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:850) 
      at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) 
      at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)

       

       

       

      [ERROR] [kafka-producer-network-thread | streams-example-e3628ed4-d2f5-44c2-a9cd-31de847589f6-StreamThread-1-0_8-producer]
       o.a.k.c.p.internals.Sender - [Producer clientId=streams-example-e3628ed4-d2f5-44c2-a9cd-31de847589f6-StreamThread-1-0_8-producer, transactionalId=streams-example-0_8] Uncaught error in kafka producer I/O thread:
      java.lang.IllegalStateException: No entry found for connection 2 
      at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:339) 
      at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:143) 
      at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:926) 
      at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) 
      at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:65) at org.apache.kafka.clients.producer.internals.Sender.maybeSendTransactionalRequest(Sender.java:421) 
      at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:286) 
      at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235) 
      at java.lang.Thread.run(Thread.java:748)

       

       

       

      [WARN] [streams-example-e3628ed4-d2f5-44c2-a9cd-31de847589f6-StreamThread-1] o.a.kafka.clients.ClientUtils - Couldn't resolve server hostname1:port1 from bootstrap.servers as DNS resolution failed for hostname1

       

       

      [INFO] [kafka-coordinator-heartbeat-thread | streams-example] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-3-consumer, groupId=streams-example] Group coordinator hostname1:port1 (id: 2147438646 rack: null) is unavailable or invalid, will attempt rediscovery
      [INFO] [kafka-coordinator-heartbeat-thread | streams-example] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-3-consumer, groupId=streams-example] Discovered group coordinator hostname1:port1 (id: 2147438646 rack: null)
      [INFO] [kafka-coordinator-heartbeat-thread | streams-example] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-3-consumer, groupId=streams-example] Group coordinator hostname1:port1 (id: 2147438646 rack: null) is unavailable or invalid, will attempt rediscovery
      [INFO] [kafka-coordinator-heartbeat-thread | streams-example] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-3-consumer, groupId=streams-example] Discovered group coordinator hostname1:port1 (id: 2147438646 rack: null)

       

       

      [INFO] [kafka-coordinator-heartbeat-thread | streams-example] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-2-consumer, groupId=streams-example] Attempt to heartbeat failed since group is rebalancing

      Attachments

        Activity

          People

            Unassigned Unassigned
            vijay0309 Vijay
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: