Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-753

BrokerProxy stop should stop the Kafka consumer first

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 0.10.0
    • None
    • None

    Description

      Currently, in the BrokerProxy stop method, we only interrupt the BrokerProxy thread, and do not take care of the simple consumer. This causes the following exception when we stop the KafkaSystemConsumer:

      2015-08-04 14:00:34 CoordinatorStreamSystemConsumer [INFO] Stopping coordinator stream system consumer.
      2015-08-04 14:00:34 BrokerProxy [INFO] Shutting down BrokerProxy for 10.151.111.27:9092
      2015-08-04 14:00:34 DefaultFetchSimpleConsumer [INFO] Reconnect due to socket error: java.nio.channels.ClosedByInterruptException
      2015-08-04 14:00:34 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from 10.151.111.27:9092
      2015-08-04 14:00:34 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from 10.151.111.27:9092
      2015-08-04 14:00:34 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from 10.151.111.27:9092
      2015-08-04 14:00:34 BrokerProxy [WARN] Restarting consumer due to java.nio.channels.ClosedChannelException. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace.
      2015-08-04 14:00:34 BrokerProxy [DEBUG] Exception detail:
      java.nio.channels.ClosedChannelException
      	at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
      	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
      	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
      	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
      	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
      	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
      	at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:48)
      	at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:41)
      	at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:176)
      	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:146)
      	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:133)
      	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
      	at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:132)
      	at java.lang.Thread.run(Thread.java:745)
      2015-08-04 14:00:34 BrokerProxy [DEBUG] Removed [__samza_coordinator_simple-task0715_1,0]
      2015-08-04 14:00:34 KafkaSystemConsumer [INFO] Abdicating for [__samza_coordinator_simple-task0715_1,0]
      2015-08-04 14:00:34 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([__samza_coordinator_simple-task0715_1,0] -> 931)
      2015-08-04 14:00:34 BrokerProxy [INFO] Shutting down due to interrupt.
      

      We should stop the simple consumer first, then interrupt the BrokerProxy.

      Attachments

        1. SAMZA-753.patch
          0.6 kB
          Yan Fang
        2. SAMZA-753.2.patch
          1 kB
          Yan Fang

        Activity

          People

            closeuris Yan Fang
            closeuris Yan Fang
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: