Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Not A Problem
-
None
-
None
-
None
Description
When the FlinkMiniCluster is created for HA tests with ZooKeeper, the shutdown is unstable.
It looks like ZooKeeper may be shut down before the JobManager is shut down, causing the shutdown procedure of the JobManager (specifically ZooKeeperSubmittedJobGraphStore.removeJobGraph) to block until tests time out.
Full log: https://api.travis-ci.org/v3/job/346853707/log.txt
Note that no ZK threads are alive any more, seems ZK is shut down already.
Relevant Stack Traces:
"main" #1 prio=5 os_prio=0 tid=0x00007f973800a800 nid=0x43b4 waiting on condition [0x00007f973eb0b000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000000008966cf18> (a scala.concurrent.impl.Promise$CompletionLatch) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037) at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157) at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169) at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.ready(package.scala:169) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.startInternalShutdown(FlinkMiniCluster.scala:469) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.stop(FlinkMiniCluster.scala:435) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.closeAsync(FlinkMiniCluster.scala:719) at org.apache.flink.test.util.MiniClusterResource.after(MiniClusterResource.java:104) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:50) ...
"flink-akka.actor.default-dispatcher-2" #1012 prio=5 os_prio=0 tid=0x00007f97394fa800 nid=0x3328 waiting on condition [0x00007f971db29000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000087f82a70> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037) at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277) at org.apache.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.internalBlockUntilConnectedOrTimedOut(CuratorZookeeperClient.java:336) at org.apache.flink.shaded.curator.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.DeleteBuilderImpl.pathInForeground(DeleteBuilderImpl.java:241) at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:225) at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:35) at org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.release(ZooKeeperStateHandleStore.java:478) at org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.releaseAndTryRemove(ZooKeeperStateHandleStore.java:435) at org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.releaseAndTryRemove(ZooKeeperStateHandleStore.java:405) at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.removeJobGraph(ZooKeeperSubmittedJobGraphStore.java:266) - locked <0x00000000807f4258> (a java.lang.Object) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$1.apply$mcV$sp(JobManager.scala:1727) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$1.apply(JobManager.scala:1723) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$1.apply(JobManager.scala:1723) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)