Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-753

BrokerProxy stop should stop the Kafka consumer first

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.10.0
    • Component/s: None
    • Labels:
      None

      Description

      Currently, in the BrokerProxy stop method, we only interrupt the BrokerProxy thread, and do not take care of the simple consumer. This causes the following exception when we stop the KafkaSystemConsumer:

      2015-08-04 14:00:34 CoordinatorStreamSystemConsumer [INFO] Stopping coordinator stream system consumer.
      2015-08-04 14:00:34 BrokerProxy [INFO] Shutting down BrokerProxy for 10.151.111.27:9092
      2015-08-04 14:00:34 DefaultFetchSimpleConsumer [INFO] Reconnect due to socket error: java.nio.channels.ClosedByInterruptException
      2015-08-04 14:00:34 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from 10.151.111.27:9092
      2015-08-04 14:00:34 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from 10.151.111.27:9092
      2015-08-04 14:00:34 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from 10.151.111.27:9092
      2015-08-04 14:00:34 BrokerProxy [WARN] Restarting consumer due to java.nio.channels.ClosedChannelException. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace.
      2015-08-04 14:00:34 BrokerProxy [DEBUG] Exception detail:
      java.nio.channels.ClosedChannelException
      	at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
      	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
      	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
      	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
      	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
      	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
      	at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:48)
      	at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:41)
      	at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:176)
      	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:146)
      	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:133)
      	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
      	at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:132)
      	at java.lang.Thread.run(Thread.java:745)
      2015-08-04 14:00:34 BrokerProxy [DEBUG] Removed [__samza_coordinator_simple-task0715_1,0]
      2015-08-04 14:00:34 KafkaSystemConsumer [INFO] Abdicating for [__samza_coordinator_simple-task0715_1,0]
      2015-08-04 14:00:34 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([__samza_coordinator_simple-task0715_1,0] -> 931)
      2015-08-04 14:00:34 BrokerProxy [INFO] Shutting down due to interrupt.
      

      We should stop the simple consumer first, then interrupt the BrokerProxy.

        Attachments

        1. SAMZA-753.patch
          0.6 kB
          Yan Fang
        2. SAMZA-753.2.patch
          1 kB
          Yan Fang

          Activity

            People

            • Assignee:
              closeuris Yan Fang
              Reporter:
              closeuris Yan Fang
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: