Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-1647

Fix race conditions in StreamProcessor.

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.14.1
    • Component/s: None
    • Labels:
      None

      Description

      Problem:

      There's a minor race condition when container shutdown is triggered from onJobModelExpired event handler. This race condition leads to an NPE and eventually kills the StreamProcessor.

      Reason:

      Following is the JobModelExpired  handler logic executed from the DebounceThread:
      1. Invoke container shutdown. 
      2. Wait for configured task.shutdown.ms through a Latch(L).
      3. Log the containerId after the configured wait time.

      Following is the onContainerFailed handler logic executed from the ContainerThread:
      1. countdown the Latch(L).
      2. Set the container field to null.

      If the Step 2. of containerFailed handler is executed before the step 3 of JobModelExpired handler, container value will be set null before we log it in JobModelExpired handler. This results in an NPE.

      Relevant stacktrace: 

      [p-0000000000-container-thread-0] ERROR org.apache.samza.processor.StreamProcessor - Container failed. Stopping the processor.
      org.apache.samza.system.SystemProducerException: Flush failed. One or more batches of messages were not sent!
      at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply$mcV$sp(KafkaSystemProducer.scala:159)
      at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:130)
      at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:130)
      at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
      at org.apache.samza.system.kafka.KafkaSystemProducer.updateTimer(KafkaSystemProducer.scala:39)
      at org.apache.samza.system.kafka.KafkaSystemProducer.flush(KafkaSystemProducer.scala:130)
      at org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64)
      at org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64)
      at scala.collection.Iterator$class.foreach(Iterator.scala:891)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
      at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
      at org.apache.samza.system.SystemProducers.flush(SystemProducers.scala:64)
      at org.apache.samza.task.TaskInstanceCollector.flush(TaskInstanceCollector.scala:69)
      at org.apache.samza.container.TaskInstance.commit(TaskInstance.scala:210)
      at org.apache.samza.container.SamzaContainer$$anonfun$shutdownTask$9.apply(SamzaContainer.scala:1024)
      at org.apache.samza.container.SamzaContainer$$anonfun$shutdownTask$9.apply(SamzaContainer.scala:1024)
      at scala.collection.Iterator$class.foreach(Iterator.scala:891)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
      at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
      at org.apache.samza.container.SamzaContainer.shutdownTask(SamzaContainer.scala:1024)
      at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:731)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      
      Caused by: org.apache.samza.system.SystemProducerException: Failed to send message for Source: TaskName-SystemStreamPartition [TestSystemName, test-input-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895, 0] on Syste m:TestSystemName Topic:test-output-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895
      at org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.onCompletion(KafkaSystemProducer.scala:109)
      at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:201)
      at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:185)
      at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:600)
      at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:272)
      at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:223)
      at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
      ... 1 more
      Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-output-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895-0: 30003 ms has passed since batch creation plus linger time
      
      [p-0000000000-container-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down JobCoordinator from StreamProcessor
      
      [debounce-thread-0] INFO org.apache.samza.processor.StreamProcessor - ShutdownComplete=true
      
      [debounce-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down container done for pid=0000000000; complete =true
      
      [debounce-thread-0] ERROR org.apache.samza.zk.ZkJobCoordinator - Encountered errors during job coordinator stop.
      
      java.lang.NullPointerException
      at org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:235)
      at org.apache.samza.zk.ZkJobCoordinator.stop(ZkJobCoordinator.java:163)
      at org.apache.samza.zk.ZkJobCoordinator$ZkSessionStateChangedListener.lambda$handleStateChanged$0(ZkJobCoordinator.java:449)
      at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:153)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                spvenkat Shanthoosh Venkataraman
                Reporter:
                spvenkat Shanthoosh Venkataraman
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: