Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-590

Dead Kafka broker ignores new leader

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.9.0
    • 0.9.0
    • kafka
    • None

    Description

      We recently discovered a bug in BrokerProxy, where the fetcher thread will continue trying to fetch from a dead leader, even if a new leader has been elected.

      We were doing a slow rolling bounce of one of our Kafka clusters, where we take a broker offline, do extended maintenance on the machine, then bring the broker back up. We observed that when the broker was moved offline, and the other brokers took over leadership for its partitions, the BrokerProxy thread never saw this update.

      The containers logged this every 10s:

      2015-03-09 19:14:19 BrokerProxy [WARN] Restarting consumer due to java.nio.channels.ClosedChannelException. Turn on debugging to get a full stack trace.
      2015-03-09 19:14:19 BrokerProxy [DEBUG] Exception detail:
      java.nio.channels.ClosedChannelException
      	at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
      	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
      	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
      	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
      	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
      	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
      	at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:48)
      	at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:41)
      	at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:174)
      	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:145)
      	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:132)
      	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
      	at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:131)
      	at java.lang.Thread.run(Thread.java:745)
      

      The problem appears to be that the BrokerProxy thread never lets go of ownership of its TopicAndPartitions when a consumer failure occurs. It just tries to reconnect and fetch again. If the broker is down for a while, this results in the thread owning TopicAndPartitions that are now lead by other brokers.

      Attachments

        1. SAMZA-590-0.patch
          6 kB
          Chris Riccomini

        Issue Links

          Activity

            People

              criccomini Chris Riccomini
              criccomini Chris Riccomini
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: