Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8206

A consumer can't discover new group coordinator when the cluster was partly restarted

    XMLWordPrintableJSON

Details

    • Bug
    • Status: In Progress
    • Critical
    • Resolution: Unresolved
    • 3.1.0, 3.2.0, 3.1.1, 3.3.0, 3.1.2, 3.2.1, 3.2.2, 3.2.3, 3.3.1
    • None
    • clients
    • Important

    Description

      A consumer can't discover new group coordinator when the cluster was partly restarted

      Preconditions:
      I use Kafka server and Java kafka-client lib 2.2 version
      I have 2 Kafka nodes running localy (localhost:9092, localhost:9093) and 1 ZK(localhost:2181)
      I have replication factor 2 for the all my topics and 'unclean.leader.election.enable=true' on both Kafka nodes.

      Steps to reproduce:

      1) Start 2nodes (localhost:9092/localhost:9093)
      2) Start consumer with 'bootstrap.servers=localhost:9092,localhost:9093'

      // discovered group coordinator (0-node)
      2019-04-09 16:23:18,963 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)>
      
      ...metadatacache is updated (2 nodes in the cluster list)
      2019-04-09 16:23:18,928 DEBUG [org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Sending metadata request (type=MetadataRequest, topics=<ALL>) to node localhost:9092 (id: -1 rack: null)>
      2019-04-09 16:23:18,940 DEBUG [org.apache.kafka.clients.Metadata.update] - Updated cluster metadata version 2 to MetadataCache{cluster=Cluster(id = P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null), localhost:9093 (id: 1 rack: null)], partitions = [], controller = localhost:9092 (id: 0 rack: null))}>
      

      3) Shutdown 1-node (localhost:9093)

      // metadata was updated to the 4 version (but for some reasons it still had 2 alive nodes inside cluster)
      2019-04-09 16:23:46,981 DEBUG [org.apache.kafka.clients.Metadata.update] - Updated cluster metadata version 4 to MetadataCache{cluster=Cluster(id = P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9093 (id: 1 rack: null), localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0,1], offlineReplicas = []), Partition(topic = events-sorted, partition = 0, leader = 0, replicas = [0,1], isr = [0,1], offlineReplicas = [])], controller = localhost:9092 (id: 0 rack: null))}>
      
      //consumers thinks that node-1 is still alive and try to send coordinator lookup to it but failed
      2019-04-09 16:23:46,981 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Discovered group coordinator localhost:9093 (id: 2147483646 rack: null)>
      2019-04-09 16:23:46,981 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Group coordinator localhost:9093 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery>
      2019-04-09 16:24:01,117 DEBUG [org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Node 1 disconnected.>
      2019-04-09 16:24:01,117 WARN [org.apache.kafka.clients.NetworkClient.processDisconnection] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Connection to node 1 (localhost:9093) could not be established. Broker may not be available.>
      
      // refreshing metadata again
      2019-04-09 16:24:01,117 DEBUG [org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Cancelled request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=events-consumer0, correlationId=112) due to node 1 being disconnected>
      2019-04-09 16:24:01,117 DEBUG [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Coordinator discovery failed, refreshing metadata>
      
      // metadata was updated to the 5 version where cluster had only 0-node localhost:9092 as expected.
      2019-04-09 16:24:01,131 DEBUG [org.apache.kafka.clients.Metadata.update] - Updated cluster metadata version 5 to MetadataCache{cluster=Cluster(id = P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0], offlineReplicas = [1]), Partition(topic = events-sorted, partition = 0, leader = 0, replicas = [0,1], isr = [0], offlineReplicas = [1])], controller = localhost:9092 (id: 0 rack: null))}>
      
      // 0-node discovered as coordinator
      2019-04-09 16:24:01,132 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)>
      

      At this point consumer stores only information about 0-node(localhost:9092) inside cluster property of the metadata cache.

      4) Shutdown 0-node (localhost:9092)
      5) Start 1-node (localhost:9093)

      //consumer tries to re-connect only to the 0-node
      2019-04-09 16:24:40,649 DEBUG [org.apache.kafka.common.network.Selector.pollSelectionKeys] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Connection with localhost disconnected>
      2019-04-09 16:24:40,649 DEBUG [org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Node 0 disconnected.>
      2019-04-09 16:24:40,649 DEBUG [org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Node 2147483647 disconnected.>
      2019-04-09 16:24:40,649 DEBUG [org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Cancelled request with header RequestHeader(apiKey=FETCH, apiVersion=10, clientId=events-consumer0, correlationId=209) due to node 0 being disconnected>
      2019-04-09 16:24:40,649 INFO [org.apache.kafka.clients.FetchSessionHandler.handleError] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Error sending fetch request (sessionId=1754516055, epoch=132) to node 0: org.apache.kafka.common.errors.DisconnectException.>
      2019-04-09 16:24:40,650 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery>
      2019-04-09 16:24:40,650 DEBUG [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.lookupCoordinator] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] No broker available to send FindCoordinator request>
      2019-04-09 16:24:40,650 DEBUG [org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Give up sending metadata request since no node is available>
      

      As I see the consumer tries to initialize connection only to the 0-node cause this is only node inside consumer's cluster metadata cache, thus consumer can't get a connection to the new group coordinator 1-node and as result can't poll any messages
      To resolve this pending state 0-node must be restored OR consumer must be re-started(in this case consumer discover a lastly started node(1-node in my case) from the initial brokers list)
      p.s. The same behavior could be reproduced using a 3 nodes cluster.

      The question: is this by design that consumer stores metadata cache with only active nodes list?
      Since it leads to a situation when a last active node from the list goes down and the other node come to life but the consumer doesn't have any info about that node and trying to re-connect to the last active node ignoring new node.

      From my point of view, a consumer should always store some initial nodes list and try to reconnect to the nodes from the initial list in case if there are no alive nodes from the metadata cluster cache.

      Attachments

        Issue Links

          Activity

            People

              ivanyu Ivan Yurchenko
              agabriel alex gabriel
              Votes:
              0 Vote for this issue
              Watchers:
              11 Start watching this issue

              Dates

                Created:
                Updated: