Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.3.0
-
unit tests
Description
There seems to be a race condition in the "JobManagerRegistrationTest.The JobManager should handle repeated registration calls" (scala) unit test.
Every so often, especially when my system is under load, this test fails with a timeout after seeing the following messages in the log4j INFO outputs:
14:18:42,257 INFO org.apache.flink.runtime.testutils.TestingResourceManager - Trying to associate with JobManager leader akka://flink/user/$f#-1062324203 14:18:42,253 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting JobManager at akka://flink/user/$f. 14:18:42,258 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, heap=1556938752, managed=10,1)) because there is currently no valid leader id known. 14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, heap=1556938752, managed=10,1)) because there is currently no valid leader id known. 14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, heap=1556938752, managed=10,1)) because there is currently no valid leader id known. 14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,RegisterResourceManager akka://flink/user/resourcemanager-61e00f37-9e99-4355-9099-53b992e8232e) because there is currently no valid leader id known. 14:18:42,259 INFO org.apache.flink.runtime.jobmanager.JobManager - JobManager akka://flink/user/$f was granted leadership with leader session ID Some(00000000-0000-0000-0000-000000000000).
Full log:
14:18:42,230 INFO org.apache.flink.runtime.blob.BlobServer - Created BLOB server storage directory /tmp/blobStore-d09b13ee-26bb-4ec8-950a-956f4a6c16cf 14:18:42,231 WARN org.apache.flink.runtime.net.SSLUtils - Not a SSL socket, will skip setting tls version and cipher suites. 14:18:42,236 INFO org.apache.flink.runtime.blob.BlobServer - Started BLOB server at 0.0.0.0:39695 - max concurrent requests: 50 - max backlog: 1000 14:18:42,247 INFO org.apache.flink.runtime.metrics.MetricRegistry - No metrics reporter configured, no metrics will be exposed/reported. 14:18:42,249 WARN org.apache.flink.runtime.metrics.MetricRegistry - Could not start MetricDumpActor. No metrics will be submitted to the WebInterface. akka.actor.InvalidActorNameException: actor name [MetricQueryService] is not unique! at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130) at akka.actor.dungeon.Children$class.reserveChild(Children.scala:76) at akka.actor.ActorCell.reserveChild(ActorCell.scala:369) at akka.actor.dungeon.Children$class.makeChild(Children.scala:201) at akka.actor.dungeon.Children$class.attachChild(Children.scala:41) at akka.actor.ActorCell.attachChild(ActorCell.scala:369) at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:553) at org.apache.flink.runtime.metrics.dump.MetricQueryService.startMetricQueryService(MetricQueryService.java:170) at org.apache.flink.runtime.metrics.MetricRegistry.startQueryService(MetricRegistry.java:166) at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2720) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.org$apache$flink$runtime$jobmanager$JobManagerRegistrationTest$$startTestingJobManager(JobManagerRegistrationTest.scala:182) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply$mcV$sp(JobManagerRegistrationTest.scala:123) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(JobManagerRegistrationTest.scala:121) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(JobManagerRegistrationTest.scala:121) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:953) at org.scalatest.Suite$class.withFixture(Suite.scala:1122) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.withFixture(JobManagerRegistrationTest.scala:50) at org.scalatest.WordSpecLike$class.invokeWithFixture$1(WordSpecLike.scala:950) at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:962) at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:962) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.WordSpecLike$class.runTest(WordSpecLike.scala:962) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.runTest(JobManagerRegistrationTest.scala:50) at org.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1021) at org.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1021) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:390) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:427) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) at org.scalatest.WordSpecLike$class.runTests(WordSpecLike.scala:1021) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.runTests(JobManagerRegistrationTest.scala:50) at org.scalatest.Suite$class.run(Suite.scala:1424) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.org$scalatest$WordSpecLike$$super$run(JobManagerRegistrationTest.scala:50) at org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067) at org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067) at org.scalatest.SuperEngine.runImpl(Engine.scala:545) at org.scalatest.WordSpecLike$class.run(WordSpecLike.scala:1067) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.org$scalatest$BeforeAndAfterAll$$super$run(JobManagerRegistrationTest.scala:50) at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.run(JobManagerRegistrationTest.scala:50) at org.scalatest.junit.JUnitRunner.run(JUnitRunner.scala:99) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) 14:18:42,257 INFO org.apache.flink.runtime.testutils.TestingResourceManager - Trying to associate with JobManager leader akka://flink/user/$f#-1062324203 14:18:42,253 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting JobManager at akka://flink/user/$f. 14:18:42,258 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, heap=1556938752, managed=10,1)) because there is currently no valid leader id known. 14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, heap=1556938752, managed=10,1)) because there is currently no valid leader id known. 14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, heap=1556938752, managed=10,1)) because there is currently no valid leader id known. 14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,RegisterResourceManager akka://flink/user/resourcemanager-61e00f37-9e99-4355-9099-53b992e8232e) because there is currently no valid leader id known. 14:18:42,259 INFO org.apache.flink.runtime.jobmanager.JobManager - JobManager akka://flink/user/$f was granted leadership with leader session ID Some(00000000-0000-0000-0000-000000000000). 14:18:42,253 INFO org.apache.flink.runtime.jobmanager.MemoryArchivist - Started memory archivist akka://flink/user/$e 14:18:52,275 ERROR org.apache.flink.runtime.testutils.TestingResourceManager - Resource manager could not register at JobManager akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(akka://flink/), Path(/user/$f#-1062324203)]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:745) 14:18:52,277 INFO org.apache.flink.runtime.testutils.TestingResourceManager - Trying to associate with JobManager leader akka://flink/user/$f#-1062324203 14:18:52,277 INFO org.apache.flink.runtime.testutils.TestingResourceManager - Resource Manager associating with leading JobManager Actor[akka://flink/user/$f#-1062324203] - leader session 00000000-0000-0000-0000-000000000000 java.lang.AssertionError: assertion failed: timeout (19999906340 nanoseconds) during expectMsgClass waiting for class org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage at scala.Predef$.assert(Predef.scala:179) at akka.testkit.TestKitBase$class.expectMsgClass_internal(TestKit.scala:423) at akka.testkit.TestKitBase$class.expectMsgType(TestKit.scala:396) at akka.testkit.TestKit.expectMsgType(TestKit.scala:718) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply$mcV$sp(JobManagerRegistrationTest.scala:157) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply(JobManagerRegistrationTest.scala:132) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply(JobManagerRegistrationTest.scala:132) at akka.testkit.TestKitBase$class.within(TestKit.scala:296) at akka.testkit.TestKit.within(TestKit.scala:718) at akka.testkit.TestKitBase$class.within(TestKit.scala:310) at akka.testkit.TestKit.within(TestKit.scala:718) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply$mcV$sp(JobManagerRegistrationTest.scala:132) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(JobManagerRegistrationTest.scala:121) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(JobManagerRegistrationTest.scala:121) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:953) at org.scalatest.Suite$class.withFixture(Suite.scala:1122) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.withFixture(JobManagerRegistrationTest.scala:50) at org.scalatest.WordSpecLike$class.invokeWithFixture$1(WordSpecLike.scala:950) at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:962) at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:962) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.WordSpecLike$class.runTest(WordSpecLike.scala:962) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.runTest(JobManagerRegistrationTest.scala:50) at org.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1021) at org.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1021) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:390) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:427) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) at org.scalatest.WordSpecLike$class.runTests(WordSpecLike.scala:1021) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.runTests(JobManagerRegistrationTest.scala:50) at org.scalatest.Suite$class.run(Suite.scala:1424) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.org$scalatest$WordSpecLike$$super$run(JobManagerRegistrationTest.scala:50) at org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067) at org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067) at org.scalatest.SuperEngine.runImpl(Engine.scala:545) at org.scalatest.WordSpecLike$class.run(WordSpecLike.scala:1067) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.org$scalatest$BeforeAndAfterAll$$super$run(JobManagerRegistrationTest.scala:50) at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.run(JobManagerRegistrationTest.scala:50) at org.scalatest.junit.JUnitRunner.run(JUnitRunner.scala:99) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Note that there is also an exception for the MetricQueryService in the log which does not really affect the unit test though except for metrics not being reported.