Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-789

CoordinatorStreamSystemConsumer should not register the coordinator stream partition after the SystemConsumer has already started

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.10.0
    • Fix Version/s: 0.10.0
    • Component/s: None
    • Labels:
      None

      Description

      The following bug has been observed in 0.10:
      1) when coordinator stream is enabled in 0.10, the OffsetManager starts CheckpointManager internally and register/start the enclosed SystemConsumer first
      2) after the container is started, the LocalityManager starts and register the enclosed SystemConsumer again, after it has already started.

      This creates the problem in KafkaSystemConsumer and generates the following warning logs in a forever loop in KafkaSystemConsumer.refreshBroker() function:

      2015-10-06 17:10:24 BrokerProxy [DEBUG] Adding new topic and partition [__samza_coordinator_acg-test_i001,0] to queue for lca1-app0908.corp.linkedin.com
      2015-10-06 17:10:24 KafkaSystemConsumer [WARN] While refreshing brokers for [__samza_coordinator_acg-test_i001,0]: org.apache.samza.SamzaException: Already consuming TopicPartition [__samza_coordinator_acg-test_i001,0]. Retrying.
      2015-10-06 17:10:24 KafkaSystemConsumer [DEBUG] Exception detail:
      org.apache.samza.SamzaException: Already consuming TopicPartition [__samza_coordinator_acg-test_i001,0]
      	at org.apache.samza.system.kafka.Toss$class.toss(Toss.scala:27)
      	at org.apache.samza.system.kafka.BrokerProxy.toss(BrokerProxy.scala:51)
      	at org.apache.samza.system.kafka.BrokerProxy.addTopicPartition(BrokerProxy.scala:96)
      	at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$refreshBrokers$2.refresh$1(KafkaSystemConsumer.scala:165)
      	at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$refreshBrokers$2.apply(KafkaSystemConsumer.scala:178)
      	at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$refreshBrokers$2.apply(KafkaSystemConsumer.scala:144)
      	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
      	at org.apache.samza.system.kafka.KafkaSystemConsumer.refreshBrokers(KafkaSystemConsumer.scala:143)
      	at org.apache.samza.system.kafka.KafkaSystemConsumer$$anon$1.refreshDropped(KafkaSystemConsumer.scala:195)
      	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:142)
      	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:134)
      	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
      	at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:133)
      	at java.lang.Thread.run(Thread.java:745)
      

      The quick fix here is to avoid registering the same coordinate stream system partition after the CoordinatorStreamSystemConsumer has already started.

      Long term fix is to manage the life-cycle states of the SystemConsumers more rigorously in a state machine model that only allows certain state transitions: e.g. only from init -> registered -> started -> stopped, etc.

        Attachments

        1. SAMZA-789-0.patch
          5 kB
          Yi Pan (Data Infrastructure)

          Activity

            People

            • Assignee:
              nickpan47 Yi Pan (Data Infrastructure)
              Reporter:
              nickpan47 Yi Pan (Data Infrastructure)
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: