Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-590

Dead Kafka broker ignores new leader

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.9.0
    • Fix Version/s: 0.9.0
    • Component/s: kafka
    • Labels:
      None

      Description

      We recently discovered a bug in BrokerProxy, where the fetcher thread will continue trying to fetch from a dead leader, even if a new leader has been elected.

      We were doing a slow rolling bounce of one of our Kafka clusters, where we take a broker offline, do extended maintenance on the machine, then bring the broker back up. We observed that when the broker was moved offline, and the other brokers took over leadership for its partitions, the BrokerProxy thread never saw this update.

      The containers logged this every 10s:

      2015-03-09 19:14:19 BrokerProxy [WARN] Restarting consumer due to java.nio.channels.ClosedChannelException. Turn on debugging to get a full stack trace.
      2015-03-09 19:14:19 BrokerProxy [DEBUG] Exception detail:
      java.nio.channels.ClosedChannelException
      	at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
      	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
      	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
      	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
      	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
      	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
      	at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:48)
      	at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:41)
      	at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:174)
      	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:145)
      	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:132)
      	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
      	at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:131)
      	at java.lang.Thread.run(Thread.java:745)
      

      The problem appears to be that the BrokerProxy thread never lets go of ownership of its TopicAndPartitions when a consumer failure occurs. It just tries to reconnect and fetch again. If the broker is down for a while, this results in the thread owning TopicAndPartitions that are now lead by other brokers.

      1. SAMZA-590-0.patch
        6 kB
        Chris Riccomini

        Issue Links

          Activity

          Hide
          criccomini Chris Riccomini added a comment -

          Thanks Yan Fang! Merged and committed.

          Show
          criccomini Chris Riccomini added a comment - Thanks Yan Fang ! Merged and committed.
          Hide
          closeuris Yan Fang added a comment -

          +1

          Show
          closeuris Yan Fang added a comment - +1
          Hide
          criccomini Chris Riccomini added a comment -

          Attaching patch. RB at:

          https://reviews.apache.org/r/31909

          1. Move "abdicate" method to be a top-level method within the class.
          2. Added an abdicateAll method that abdicates all TopicAndPartitions.
          3. Invoke abdicateAll whenever a consumer failure occurs.
          4. Wrote a unit test to validate that the BrokerProxy relinquishes ownership for all TopicAndPartitions on failure, and that it continues to refresh dropped TopicAndPartitions (in case its broker comes alive again) afterwards.

          Also, ran this code in one of our jobs on our unstable cluster (that we're bouncing as described in description), and found that the BrokerProxy properly relinquished ownership.

          2015-03-10 17:42:31 DefaultFetchSimpleConsumer [INFO] Reconnect due to socket error: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
          2015-03-10 17:42:31 BrokerProxy [WARN] Restarting consumer due to java.nio.channels.ClosedChannelException. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace.
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Abdicating for [topic1,14]
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 VerifiableProperties [INFO] Verifying properties
          2015-03-10 17:42:31 VerifiableProperties [INFO] Property client.id is overridden to samza_consumer-job_name-i001-1425999995883-2
          2015-03-10 17:42:31 VerifiableProperties [INFO] Property metadata.broker.list is overridden to broker-vip:10251
          2015-03-10 17:42:31 VerifiableProperties [INFO] Property request.timeout.ms is overridden to 60000
          2015-03-10 17:42:31 ClientUtils$ [INFO] Fetching metadata from broker id:0,host:broker-vip,port:10251 with correlation id 1 for 1 topic(s) Set(topic1)
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 SyncProducer [INFO] Connected to broker-vip:10251 for producing
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 SyncProducer [INFO] Disconnecting from broker-vip:10251
          2015-03-10 17:42:31 BrokerProxy [INFO] Creating new SimpleConsumer for host broker1:10251 for system kafka
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 GetOffset [INFO] Validating offset 18198544 for topic and partition [topic1,14]
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544)
          2015-03-10 17:42:32 GetOffset [INFO] Able to successfully read from offset 18198544 for topic and partition [topic1,14]. Using it to instantiate consumer.
          2015-03-10 17:42:32 BrokerProxy [INFO] Starting BrokerProxy for broker1:10251
          2015-03-10 17:42:32 BrokerProxy [INFO] Creating new SimpleConsumer for host broker2:10251 for system kafka
          
          Show
          criccomini Chris Riccomini added a comment - Attaching patch. RB at: https://reviews.apache.org/r/31909 Move "abdicate" method to be a top-level method within the class. Added an abdicateAll method that abdicates all TopicAndPartitions. Invoke abdicateAll whenever a consumer failure occurs. Wrote a unit test to validate that the BrokerProxy relinquishes ownership for all TopicAndPartitions on failure, and that it continues to refresh dropped TopicAndPartitions (in case its broker comes alive again) afterwards. Also, ran this code in one of our jobs on our unstable cluster (that we're bouncing as described in description), and found that the BrokerProxy properly relinquished ownership. 2015-03-10 17:42:31 DefaultFetchSimpleConsumer [INFO] Reconnect due to socket error: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. 2015-03-10 17:42:31 BrokerProxy [WARN] Restarting consumer due to java.nio.channels.ClosedChannelException. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace. 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Abdicating for [topic1,14] 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 VerifiableProperties [INFO] Verifying properties 2015-03-10 17:42:31 VerifiableProperties [INFO] Property client.id is overridden to samza_consumer-job_name-i001-1425999995883-2 2015-03-10 17:42:31 VerifiableProperties [INFO] Property metadata.broker.list is overridden to broker-vip:10251 2015-03-10 17:42:31 VerifiableProperties [INFO] Property request.timeout.ms is overridden to 60000 2015-03-10 17:42:31 ClientUtils$ [INFO] Fetching metadata from broker id:0,host:broker-vip,port:10251 with correlation id 1 for 1 topic(s) Set(topic1) 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 SyncProducer [INFO] Connected to broker-vip:10251 for producing 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 SyncProducer [INFO] Disconnecting from broker-vip:10251 2015-03-10 17:42:31 BrokerProxy [INFO] Creating new SimpleConsumer for host broker1:10251 for system kafka 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 GetOffset [INFO] Validating offset 18198544 for topic and partition [topic1,14] 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([topic1,14] -> 18198544) 2015-03-10 17:42:32 GetOffset [INFO] Able to successfully read from offset 18198544 for topic and partition [topic1,14]. Using it to instantiate consumer. 2015-03-10 17:42:32 BrokerProxy [INFO] Starting BrokerProxy for broker1:10251 2015-03-10 17:42:32 BrokerProxy [INFO] Creating new SimpleConsumer for host broker2:10251 for system kafka

            People

            • Assignee:
              criccomini Chris Riccomini
              Reporter:
              criccomini Chris Riccomini
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development