Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-1406

Fix potential orphaned containers problem in stand alone.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 0.14.0
    • None
    • None

    Description

      When stream processor is shutting down, we can see that the already submitted Zk tasks to ScheduleAfterDebounce task queue is picked up and executed.  Here's the sample stacktrace:

      111018 [p-0000000002-container-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down JobCoordinator from StreamProcessor
      111043 [SessionTracker] INFO org.apache.zookeeper.server.SessionTrackerImpl - SessionTrackerImpl exited loop!
      111703 [Thread-43-SendThread(127.0.0.1:64983)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:64983. Will not attempt to authenticate using SASL (unknown error)
      111704 [Thread-43-SendThread(127.0.0.1:64983)] WARN org.apache.zookeeper.ClientCnxn - Session 0x15e05975d900005 for server null, unexpected error, closing socket connection and attempting reconnect
      java.net.ConnectException: Connection refused
      	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
      	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
      	at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
      	at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
      111735 [Thread-16-SendThread(127.0.0.1:64594)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:64594. Will not attempt to authenticate using SASL (unknown error)
      111839 [p-0000000002-container-thread-0] INFO org.apache.zookeeper.ZooKeeper - Session: 0x15e059731f90005 closed
      111839 [Thread-16-EventThread] INFO org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x15e059731f90005
      111839 [p-0000000002-container-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down the executor service.
      111840 [CONTAINER-SHUTDOWN-HOOK] INFO org.apache.samza.container.SamzaContainer - Shutdown complete
      111872 [Thread-43-SendThread(127.0.0.1:64983)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:64983. Will not attempt to authenticate using SASL (unknown error)
      111872 [Thread-43-SendThread(127.0.0.1:64983)] WARN org.apache.zookeeper.ClientCnxn - Session 0x15e05975d900004 for server null, unexpected error, closing socket connection and attempting reconnect
      java.net.ConnectException: Connection refused
      	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
      	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
      	at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
      	at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
      111941 [debounce-thread-0] ERROR org.apache.samza.zk.ScheduleAfterDebounceTime - OnProcessorChange threw an exception.
      org.apache.samza.SamzaException: Cannot read ZK node: /app-test-app-name-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-test-app-id-a6b3542d-b6ce-4517-ad6f-d25070da4ce1/test-app-name-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-test-app-id-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-coordinationData/processors/0000000000
      	at org.apache.samza.zk.ZkUtils.readProcessorData(ZkUtils.java:230)
      	at org.apache.samza.zk.ZkUtils.getActiveProcessorsIDs(ZkUtils.java:253)
      	at org.apache.samza.zk.ZkJobCoordinator.getActualProcessorIds(ZkJobCoordinator.java:256)
      	at org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange(ZkJobCoordinator.java:167)
      	at org.apache.samza.zk.ZkJobCoordinator.lambda$onProcessorChange$1(ZkJobCoordinator.java:161)
      	at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$scheduleAfterDebounceTime$0(ScheduleAfterDebounceTime.java:95)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.IllegalStateException: ZkClient already closed!
      	at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
      	at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1099)
      	at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1094)
      	at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1083)
      	at org.apache.samza.zk.ZkUtils.readProcessorData(ZkUtils.java:224)
      	... 12 more
      111942 [debounce-thread-0] ERROR org.apache.samza.zk.ZkJobCoordinator - Received exception from in JobCoordinator Processing!
      org.apache.samza.SamzaException: Cannot read ZK node: /app-test-app-name-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-test-app-id-a6b3542d-b6ce-4517-ad6f-d25070da4ce1/test-app-name-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-test-app-id-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-coordinationData/processors/0000000000
      	at org.apache.samza.zk.ZkUtils.readProcessorData(ZkUtils.java:230)
      	at org.apache.samza.zk.ZkUtils.getActiveProcessorsIDs(ZkUtils.java:253)
      	at org.apache.samza.zk.ZkJobCoordinator.getActualProcessorIds(ZkJobCoordinator.java:256)
      	at org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange(ZkJobCoordinator.java:167)
      	at org.apache.samza.zk.ZkJobCoordinator.lambda$onProcessorChange$1(ZkJobCoordinator.java:161)
      	at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$scheduleAfterDebounceTime$0(ScheduleAfterDebounceTime.java:95)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.SchedeadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.IllegalStateException: ZkClient already closed!
      	at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
      	at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1099)
      	at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1094)
      	at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1083)
      	at org.apache.samza.zk.ZkUtils.readProcessorData(ZkUtils.java:224)
      	... 12 more
      111944 [debounce-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down the executor service.
      111962 [p-0000000000-container-thread-0] INFO org.apache.zookeeper.ZooKeeper - Session: 0x15e059709020003 closed
      

       
      Reason being, we call `executorService.shutdownNow();` from StreamProcessor, which just sets the interrupt flag to true for running threads in task queue.
       
      We don't check if the thread is interrupted or not when starting to execute or while executing in the task implementation.

      User thread is shutting down the stream processor and at the same time due to an external event, the executorService task is bringing the stream processor back up(and the task is not killed by executorService.shutdown depending upon timing of events). 

      Actual implementation detail where this could happen is `debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> onNewJobModelConfirmed(version))` and LocalApplicationRunner.kill(streamApplication) triggered at the same time. 

      In worst case this will lead to orphaned containers problem, since the job coordinator associated with stream application is stopped and the stream processor is still running(provided that there're multiple stream processors in a JVM).

      Attachments

        Issue Links

          Activity

            People

              spvenkat Shanthoosh Venkataraman
              spvenkat Shanthoosh Venkataraman
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: