Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
Description
When stream processor is shutting down, we can see that the already submitted Zk tasks to ScheduleAfterDebounce task queue is picked up and executed. Here's the sample stacktrace:
111018 [p-0000000002-container-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down JobCoordinator from StreamProcessor 111043 [SessionTracker] INFO org.apache.zookeeper.server.SessionTrackerImpl - SessionTrackerImpl exited loop! 111703 [Thread-43-SendThread(127.0.0.1:64983)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:64983. Will not attempt to authenticate using SASL (unknown error) 111704 [Thread-43-SendThread(127.0.0.1:64983)] WARN org.apache.zookeeper.ClientCnxn - Session 0x15e05975d900005 for server null, unexpected error, closing socket connection and attempting reconnect java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141) 111735 [Thread-16-SendThread(127.0.0.1:64594)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:64594. Will not attempt to authenticate using SASL (unknown error) 111839 [p-0000000002-container-thread-0] INFO org.apache.zookeeper.ZooKeeper - Session: 0x15e059731f90005 closed 111839 [Thread-16-EventThread] INFO org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x15e059731f90005 111839 [p-0000000002-container-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down the executor service. 111840 [CONTAINER-SHUTDOWN-HOOK] INFO org.apache.samza.container.SamzaContainer - Shutdown complete 111872 [Thread-43-SendThread(127.0.0.1:64983)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:64983. Will not attempt to authenticate using SASL (unknown error) 111872 [Thread-43-SendThread(127.0.0.1:64983)] WARN org.apache.zookeeper.ClientCnxn - Session 0x15e05975d900004 for server null, unexpected error, closing socket connection and attempting reconnect java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141) 111941 [debounce-thread-0] ERROR org.apache.samza.zk.ScheduleAfterDebounceTime - OnProcessorChange threw an exception. org.apache.samza.SamzaException: Cannot read ZK node: /app-test-app-name-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-test-app-id-a6b3542d-b6ce-4517-ad6f-d25070da4ce1/test-app-name-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-test-app-id-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-coordinationData/processors/0000000000 at org.apache.samza.zk.ZkUtils.readProcessorData(ZkUtils.java:230) at org.apache.samza.zk.ZkUtils.getActiveProcessorsIDs(ZkUtils.java:253) at org.apache.samza.zk.ZkJobCoordinator.getActualProcessorIds(ZkJobCoordinator.java:256) at org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange(ZkJobCoordinator.java:167) at org.apache.samza.zk.ZkJobCoordinator.lambda$onProcessorChange$1(ZkJobCoordinator.java:161) at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$scheduleAfterDebounceTime$0(ScheduleAfterDebounceTime.java:95) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: ZkClient already closed! at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1099) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1094) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1083) at org.apache.samza.zk.ZkUtils.readProcessorData(ZkUtils.java:224) ... 12 more 111942 [debounce-thread-0] ERROR org.apache.samza.zk.ZkJobCoordinator - Received exception from in JobCoordinator Processing! org.apache.samza.SamzaException: Cannot read ZK node: /app-test-app-name-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-test-app-id-a6b3542d-b6ce-4517-ad6f-d25070da4ce1/test-app-name-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-test-app-id-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-coordinationData/processors/0000000000 at org.apache.samza.zk.ZkUtils.readProcessorData(ZkUtils.java:230) at org.apache.samza.zk.ZkUtils.getActiveProcessorsIDs(ZkUtils.java:253) at org.apache.samza.zk.ZkJobCoordinator.getActualProcessorIds(ZkJobCoordinator.java:256) at org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange(ZkJobCoordinator.java:167) at org.apache.samza.zk.ZkJobCoordinator.lambda$onProcessorChange$1(ZkJobCoordinator.java:161) at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$scheduleAfterDebounceTime$0(ScheduleAfterDebounceTime.java:95) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.SchedeadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: ZkClient already closed! at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1099) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1094) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1083) at org.apache.samza.zk.ZkUtils.readProcessorData(ZkUtils.java:224) ... 12 more 111944 [debounce-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down the executor service. 111962 [p-0000000000-container-thread-0] INFO org.apache.zookeeper.ZooKeeper - Session: 0x15e059709020003 closed
Reason being, we call `executorService.shutdownNow();` from StreamProcessor, which just sets the interrupt flag to true for running threads in task queue.
We don't check if the thread is interrupted or not when starting to execute or while executing in the task implementation.
User thread is shutting down the stream processor and at the same time due to an external event, the executorService task is bringing the stream processor back up(and the task is not killed by executorService.shutdown depending upon timing of events).
Actual implementation detail where this could happen is `debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> onNewJobModelConfirmed(version))` and LocalApplicationRunner.kill(streamApplication) triggered at the same time.
In worst case this will lead to orphaned containers problem, since the job coordinator associated with stream application is stopped and the stream processor is still running(provided that there're multiple stream processors in a JVM).
Attachments
Issue Links
- links to