Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10039

A Kafka broker is gracefully shutdown, and incorrect metadata was passed to the Kafka connect client.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.3.1
    • None
    • connect
    • None

    Description

      To maintain the server, one of the 20 brokers was shutdown gracefully, but all kafka-sink-connect cluster suddenly died with the following NPE error.

       

      // error log: kafka distributed sink connect 
      
      [2020-05-22 15:16:20,433] ERROR [Worker clientId=connect-1, groupId=dc2-log-hyper-connector] Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:253)
      java.lang.NullPointerException
          at java.util.Objects.requireNonNull(Objects.java:203)
          at org.apache.kafka.common.Cluster.<init>(Cluster.java:134)
          at org.apache.kafka.common.Cluster.<init>(Cluster.java:89)
          at org.apache.kafka.clients.MetadataCache.computeClusterView(MetadataCache.java:120)
          at org.apache.kafka.clients.MetadataCache.<init>(MetadataCache.java:82)
          at org.apache.kafka.clients.MetadataCache.<init>(MetadataCache.java:58)
          at org.apache.kafka.clients.Metadata.handleMetadataResponse(Metadata.java:325)
          at org.apache.kafka.clients.Metadata.update(Metadata.java:252)
          at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:1059)
          at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:845)
          at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:548)
          at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
          at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
          at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
          at org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.poll(WorkerCoordinator.java:154)
          at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.poll(WorkerGroupMember.java:166)
          at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:355)
          at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:245)
          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          at java.lang.Thread.run(Thread.java:748)
      

       

      Replication-factor of all topics was more than 2, there were 50 topics and 200 partitions. So, checking up the error and the Kafka library source code, it seems that the error occurred when the Connect Distributed Herder (client) cached the metadata including the broker_node_id_set and partition_info_set received from the broker.

      // source code: org.apache.kafka.common.Cluster (v2.3.1)
      
      // index the partition infos by topic, topic+partition, and node
      // note that this code is performance sensitive if there are a large number of partitions so we are careful
      // to avoid unnecessary work
      Map<TopicPartition, PartitionInfo> tmpPartitionsByTopicPartition = new HashMap<>(partitions.size());
      Map<String, List<PartitionInfo>> tmpPartitionsByTopic = new HashMap<>();
      for (PartitionInfo p : partitions) {
          tmpPartitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
          List<PartitionInfo> partitionsForTopic = tmpPartitionsByTopic.get(p.topic());
          if (partitionsForTopic == null) {
              partitionsForTopic = new ArrayList<>();
              tmpPartitionsByTopic.put(p.topic(), partitionsForTopic);
          }
          partitionsForTopic.add(p);
          if (p.leader() != null) {
              // The broker guarantees that if a partition has a non-null leader, it is one of the brokers returned
              // in the metadata response
              List<PartitionInfo> partitionsForNode = Objects.requireNonNull(tmpPartitionsByNode.get(p.leader().id()));
              partitionsForNode.add(p);
          }
      }
      

      How can this happen, and how to deal with it in the future? (the Version of Broker and Client is v2.3.1)

      Attachments

        Activity

          People

            Unassigned Unassigned
            Dogil MinsuJo
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: