Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-1088

TestStreamProcessor hangs during job initialization due to Kafka broker is not available

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 0.12.0
    • Fix Version/s: 0.15.0
    • Component/s: None
    • Labels:
      None

      Description

      On my Mac build, running TestStreamProcessor hangs due to the following issue:

      01:35:13.680 [DEBUG] [TestEventLogger] org.apache.samza.test.processor.TestStreamProcessor > testStreamProcessor STANDARD_OUT
      01:35:13.680 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.678 [ZkClient-EventThread-28-127.0.0.1:53690] PartitionStateMachine$TopicChangeListener [INFO] [TopicChangeListener on
       Controller 0]: New topics: [Set()], deleted topics: [Set()], new partition replica assignment [Map()]
      01:35:13.683 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.683 [ZkClient-EventThread-28-127.0.0.1:53690] ZookeeperLeaderElector$LeaderChangeListener [INFO] New leader is 0
      01:35:13.686 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.686 [ZkClient-EventThread-28-127.0.0.1:53690] PartitionStateMachine$PartitionModificationsListener [INFO] [AddPartitio
      nsListener on 0]: Partition modification triggered {"version":1,"partitions":{"0":[0]}} for path /brokers/topics/numbers
      01:35:13.691 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.690 [ZkClient-EventThread-28-127.0.0.1:53690] PartitionStateMachine$PartitionModificationsListener [INFO] [AddPartitio
      nsListener on 0]: Partition modification triggered {"version":1,"partitions":{"0":[0]}} for path /brokers/topics/output
      01:35:13.693 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.693 [ZkClient-EventThread-28-127.0.0.1:53690] ReplicaStateMachine$BrokerChangeListener [INFO] [BrokerChangeListener on
       Controller 0]: Broker change listener fired for path /brokers/ids with children 0
      01:35:13.697 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.696 [ZkClient-EventThread-28-127.0.0.1:53690] ReplicaStateMachine$BrokerChangeListener [INFO] [BrokerChangeListener on
       Controller 0]: Newly added brokers: , deleted brokers: , all live brokers: 0
      01:35:17.678 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.677 [Test worker] VerifiableProperties [INFO] Verifying properties
      01:35:17.678 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.678 [Test worker] VerifiableProperties [INFO] Property client.id is overridden to samza_admin-test_job-1
      01:35:17.679 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.678 [Test worker] VerifiableProperties [INFO] Property metadata.broker.list is overridden to :53693
      01:35:17.679 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.678 [Test worker] VerifiableProperties [INFO] Property request.timeout.ms is overridden to 30000
      01:35:17.679 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.678 [Test worker] ClientUtils$ [INFO] Fetching metadata from broker BrokerEndPoint(0,,53693) with correlation id 0 for
       1 topic(s) Set(numbers)
      01:35:17.680 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.679 [Test worker] SyncProducer [INFO] Connected to :53693 for producing
      01:35:17.681 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.679 [Test worker] SyncProducer [INFO] Disconnecting from :53693
      01:35:17.683 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.679 [Test worker] ClientUtils$ [WARN] Fetching topic metadata with correlation id 0 for topics [Set(numbers)] from broker [BrokerEndPoint(0,,53693)] failed
      01:35:17.683 [DEBUG] [TestEventLogger]     java.nio.channels.ClosedChannelException
      01:35:17.683 [DEBUG] [TestEventLogger]          at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
      01:35:17.683 [DEBUG] [TestEventLogger]          at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
      01:35:17.683 [DEBUG] [TestEventLogger]          at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
      01:35:17.683 [DEBUG] [TestEventLogger]          at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
      01:35:17.683 [DEBUG] [TestEventLogger]          at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
      01:35:17.683 [DEBUG] [TestEventLogger]          at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
      01:35:17.683 [DEBUG] [TestEventLogger]          at org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(ClientUtilTopicMetadataStore.scala:37)
      01:35:17.683 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(KafkaSystemAdmin.scala:373)
      01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2$$anonfun$6.apply(KafkaSystemAdmin.scala:158)
      01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2$$anonfun$6.apply(KafkaSystemAdmin.scala:158)
      01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(TopicMetadataCache.scala:52)
      01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2.apply(KafkaSystemAdmin.scala:155)
      01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2.apply(KafkaSystemAdmin.scala:154)
      01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
      01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamPartitionCounts(KafkaSystemAdmin.scala:153)
      01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamPartitionCounts(KafkaSystemAdmin.scala:147)
      01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:67)
      01:35:17.684 [DEBUG] [TestEventLogger]          at org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:62)
      01:35:17.685 [DEBUG] [TestEventLogger]          at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      01:35:17.685 [DEBUG] [TestEventLogger]          at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      01:35:17.685 [DEBUG] [TestEventLogger]          at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
      01:35:17.685 [DEBUG] [TestEventLogger]          at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
      01:35:17.685 [DEBUG] [TestEventLogger]          at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
      01:35:17.685 [DEBUG] [TestEventLogger]          at org.apache.samza.system.StreamMetadataCache.getStreamMetadata(StreamMetadataCache.scala:62)
      01:35:17.685 [DEBUG] [TestEventLogger]          at org.apache.samza.coordinator.JobModelManager$.getInputStreamPartitions(JobModelManager.scala:143)
      01:35:17.685 [DEBUG] [TestEventLogger]          at org.apache.samza.coordinator.JobModelManager$.getMatchedInputStreamPartitions(JobModelManager.scala:154)
      01:35:17.685 [DEBUG] [TestEventLogger]          at org.apache.samza.coordinator.JobModelManager$.initializeJobModel(JobModelManager.scala:193)
      01:35:17.685 [DEBUG] [TestEventLogger]          at org.apache.samza.coordinator.JobModelManager$.getJobCoordinator(JobModelManager.scala:125)
      01:35:17.686 [DEBUG] [TestEventLogger]          at org.apache.samza.standalone.StandaloneJobCoordinator.<init>(StandaloneJobCoordinator.java:108)
      01:35:17.686 [DEBUG] [TestEventLogger]          at org.apache.samza.standalone.StandaloneJobCoordinatorFactory.getJobCoordinator(StandaloneJobCoordinatorFactory.java:29)
      01:35:17.686 [DEBUG] [TestEventLogger]          at org.apache.samza.processor.StreamProcessor.<init>(StreamProcessor.java:134)
      01:35:17.686 [DEBUG] [TestEventLogger]          at org.apache.samza.processor.StreamProcessor.<init>(StreamProcessor.java:111)
      01:35:17.686 [DEBUG] [TestEventLogger]          at org.apache.samza.test.processor.TestStreamProcessor.testStreamProcessor(TestStreamProcessor.java:72)
      01:35:17.686 [DEBUG] [TestEventLogger]          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      01:35:17.686 [DEBUG] [TestEventLogger]          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      01:35:17.686 [DEBUG] [TestEventLogger]          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      01:35:17.686 [DEBUG] [TestEventLogger]          at java.lang.reflect.Method.invoke(Method.java:483)
      01:35:17.686 [DEBUG] [TestEventLogger]          at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
      01:35:17.686 [DEBUG] [TestEventLogger]          at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
      01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
      01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
      01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
      01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
      01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:76)
      01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
      01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
      01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
      01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
      01:35:17.687 [DEBUG] [TestEventLogger]          at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
      01:35:17.688 [DEBUG] [TestEventLogger]          at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
      01:35:17.688 [DEBUG] [TestEventLogger]          at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
      01:35:17.688 [DEBUG] [TestEventLogger]          at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
      01:35:17.688 [DEBUG] [TestEventLogger]          at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
      01:35:17.688 [DEBUG] [TestEventLogger]          at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64)
      01:35:17.688 [DEBUG] [TestEventLogger]          at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50)
      01:35:17.688 [DEBUG] [TestEventLogger]          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      01:35:17.688 [DEBUG] [TestEventLogger]          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      01:35:17.688 [DEBUG] [TestEventLogger]          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      01:35:17.689 [DEBUG] [TestEventLogger]          at java.lang.reflect.Method.invoke(Method.java:483)
      01:35:17.689 [DEBUG] [TestEventLogger]          at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
      01:35:17.689 [DEBUG] [TestEventLogger]          at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
      01:35:17.689 [DEBUG] [TestEventLogger]          at org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
      01:35:17.689 [DEBUG] [TestEventLogger]          at org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
      01:35:17.689 [DEBUG] [TestEventLogger]          at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
      01:35:17.689 [DEBUG] [TestEventLogger]          at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106)
      01:35:17.689 [DEBUG] [TestEventLogger]          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      01:35:17.690 [DEBUG] [TestEventLogger]          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      01:35:17.690 [DEBUG] [TestEventLogger]          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      01:35:17.690 [DEBUG] [TestEventLogger]          at java.lang.reflect.Method.invoke(Method.java:483)
      01:35:17.690 [DEBUG] [TestEventLogger]          at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
      01:35:17.690 [DEBUG] [TestEventLogger]          at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
      01:35:17.690 [DEBUG] [TestEventLogger]          at org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:360)
      01:35:17.690 [DEBUG] [TestEventLogger]          at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
      01:35:17.690 [DEBUG] [TestEventLogger]          at org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
      01:35:17.690 [DEBUG] [TestEventLogger]          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      01:35:17.691 [DEBUG] [TestEventLogger]          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      01:35:17.691 [DEBUG] [TestEventLogger]          at java.lang.Thread.run(Thread.java:745)
      01:35:17.691 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.681 [Test worker] SyncProducer [INFO] Disconnecting from :53693
      01:35:17.691 [DEBUG] [TestEventLogger] 
      01:35:17.691 [DEBUG] [TestEventLogger] org.apache.samza.test.processor.TestStreamProcessor > testStreamProcessor STANDARD_ERROR
      01:35:17.691 [DEBUG] [TestEventLogger]     113333 [Test worker] WARN org.apache.samza.system.kafka.KafkaSystemAdmin - Unable to fetch last offsets for streams [numbers] due to kafka.common.KafkaException: fetching topic metadata for topics [Set(numbers)] from broker [ArrayBuffer(BrokerEndPoint(0,,53693))] failed. Retrying.
      

      Turns out that the initialization of the job failed during the fetchMetadata for the input topics. It keeps re-trying due to failed connection to the broker. At that moment, the broker can not be connected to.

      Note that this only happens in Mac and when running the test via gradle:

      ./gradlew clean :samza-test:build -Dtest.single=TestStreamProcessor --debug
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                nickpan47 Yi Pan (Data Infrastructure)
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated: