Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-789

CoordinatorStreamSystemConsumer should not register the coordinator stream partition after the SystemConsumer has already started

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.10.0
    • 0.10.0
    • None
    • None

    Description

      The following bug has been observed in 0.10:
      1) when coordinator stream is enabled in 0.10, the OffsetManager starts CheckpointManager internally and register/start the enclosed SystemConsumer first
      2) after the container is started, the LocalityManager starts and register the enclosed SystemConsumer again, after it has already started.

      This creates the problem in KafkaSystemConsumer and generates the following warning logs in a forever loop in KafkaSystemConsumer.refreshBroker() function:

      2015-10-06 17:10:24 BrokerProxy [DEBUG] Adding new topic and partition [__samza_coordinator_acg-test_i001,0] to queue for lca1-app0908.corp.linkedin.com
      2015-10-06 17:10:24 KafkaSystemConsumer [WARN] While refreshing brokers for [__samza_coordinator_acg-test_i001,0]: org.apache.samza.SamzaException: Already consuming TopicPartition [__samza_coordinator_acg-test_i001,0]. Retrying.
      2015-10-06 17:10:24 KafkaSystemConsumer [DEBUG] Exception detail:
      org.apache.samza.SamzaException: Already consuming TopicPartition [__samza_coordinator_acg-test_i001,0]
      	at org.apache.samza.system.kafka.Toss$class.toss(Toss.scala:27)
      	at org.apache.samza.system.kafka.BrokerProxy.toss(BrokerProxy.scala:51)
      	at org.apache.samza.system.kafka.BrokerProxy.addTopicPartition(BrokerProxy.scala:96)
      	at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$refreshBrokers$2.refresh$1(KafkaSystemConsumer.scala:165)
      	at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$refreshBrokers$2.apply(KafkaSystemConsumer.scala:178)
      	at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$refreshBrokers$2.apply(KafkaSystemConsumer.scala:144)
      	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
      	at org.apache.samza.system.kafka.KafkaSystemConsumer.refreshBrokers(KafkaSystemConsumer.scala:143)
      	at org.apache.samza.system.kafka.KafkaSystemConsumer$$anon$1.refreshDropped(KafkaSystemConsumer.scala:195)
      	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:142)
      	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:134)
      	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
      	at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:133)
      	at java.lang.Thread.run(Thread.java:745)
      

      The quick fix here is to avoid registering the same coordinate stream system partition after the CoordinatorStreamSystemConsumer has already started.

      Long term fix is to manage the life-cycle states of the SystemConsumers more rigorously in a state machine model that only allows certain state transitions: e.g. only from init -> registered -> started -> stopped, etc.

      Attachments

        1. SAMZA-789-0.patch
          5 kB
          Yi Pan

        Activity

          People

            nickpan47 Yi Pan
            nickpan47 Yi Pan
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: