Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6287

Flakey JobManagerRegistrationTest

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0
    • Component/s: JobManager, Tests
    • Labels:
    • Environment:

      unit tests

      Description

      There seems to be a race condition in the "JobManagerRegistrationTest.The JobManager should handle repeated registration calls" (scala) unit test.
      Every so often, especially when my system is under load, this test fails with a timeout after seeing the following messages in the log4j INFO outputs:

      14:18:42,257 INFO org.apache.flink.runtime.testutils.TestingResourceManager - Trying to associate with JobManager leader akka://flink/user/$f#-1062324203
      14:18:42,253 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting JobManager at akka://flink/user/$f.
      14:18:42,258 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, heap=1556938752, managed=10,1)) because there is currently no valid leader id known.
      14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, heap=1556938752, managed=10,1)) because there is currently no valid leader id known.
      14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, heap=1556938752, managed=10,1)) because there is currently no valid leader id known.
      14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,RegisterResourceManager akka://flink/user/resourcemanager-61e00f37-9e99-4355-9099-53b992e8232e) because there is currently no valid leader id known.
      14:18:42,259 INFO org.apache.flink.runtime.jobmanager.JobManager - JobManager akka://flink/user/$f was granted leadership with leader session ID Some(00000000-0000-0000-0000-000000000000).
      

      Full log:

       14:18:42,230 INFO org.apache.flink.runtime.blob.BlobServer - Created BLOB server storage directory /tmp/blobStore-d09b13ee-26bb-4ec8-950a-956f4a6c16cf
      14:18:42,231 WARN org.apache.flink.runtime.net.SSLUtils - Not a SSL socket, will skip setting tls version and cipher suites.
      14:18:42,236 INFO org.apache.flink.runtime.blob.BlobServer - Started BLOB server at 0.0.0.0:39695 - max concurrent requests: 50 - max backlog: 1000
      14:18:42,247 INFO org.apache.flink.runtime.metrics.MetricRegistry - No metrics reporter configured, no metrics will be exposed/reported.
      14:18:42,249 WARN org.apache.flink.runtime.metrics.MetricRegistry - Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.
      akka.actor.InvalidActorNameException: actor name [MetricQueryService] is not unique!
      at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
      at akka.actor.dungeon.Children$class.reserveChild(Children.scala:76)
      at akka.actor.ActorCell.reserveChild(ActorCell.scala:369)
      at akka.actor.dungeon.Children$class.makeChild(Children.scala:201)
      at akka.actor.dungeon.Children$class.attachChild(Children.scala:41)
      at akka.actor.ActorCell.attachChild(ActorCell.scala:369)
      at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:553)
      at org.apache.flink.runtime.metrics.dump.MetricQueryService.startMetricQueryService(MetricQueryService.java:170)
      at org.apache.flink.runtime.metrics.MetricRegistry.startQueryService(MetricRegistry.java:166)
      at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2720)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.org$apache$flink$runtime$jobmanager$JobManagerRegistrationTest$$startTestingJobManager(JobManagerRegistrationTest.scala:182)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply$mcV$sp(JobManagerRegistrationTest.scala:123)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(JobManagerRegistrationTest.scala:121)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(JobManagerRegistrationTest.scala:121)
      at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
      at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
      at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
      at org.scalatest.Transformer.apply(Transformer.scala:22)
      at org.scalatest.Transformer.apply(Transformer.scala:20)
      at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:953)
      at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.withFixture(JobManagerRegistrationTest.scala:50)
      at org.scalatest.WordSpecLike$class.invokeWithFixture$1(WordSpecLike.scala:950)
      at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:962)
      at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:962)
      at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
      at org.scalatest.WordSpecLike$class.runTest(WordSpecLike.scala:962)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.runTest(JobManagerRegistrationTest.scala:50)
      at org.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1021)
      at org.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1021)
      at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
      at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
      at scala.collection.immutable.List.foreach(List.scala:318)
      at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
      at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:390)
      at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:427)
      at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
      at scala.collection.immutable.List.foreach(List.scala:318)
      at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
      at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
      at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
      at org.scalatest.WordSpecLike$class.runTests(WordSpecLike.scala:1021)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.runTests(JobManagerRegistrationTest.scala:50)
      at org.scalatest.Suite$class.run(Suite.scala:1424)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.org$scalatest$WordSpecLike$$super$run(JobManagerRegistrationTest.scala:50)
      at org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067)
      at org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067)
      at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
      at org.scalatest.WordSpecLike$class.run(WordSpecLike.scala:1067)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.org$scalatest$BeforeAndAfterAll$$super$run(JobManagerRegistrationTest.scala:50)
      at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
      at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.run(JobManagerRegistrationTest.scala:50)
      at org.scalatest.junit.JUnitRunner.run(JUnitRunner.scala:99)
      at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
      at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
      at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
      at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
      at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
      14:18:42,257 INFO org.apache.flink.runtime.testutils.TestingResourceManager - Trying to associate with JobManager leader akka://flink/user/$f#-1062324203
      14:18:42,253 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting JobManager at akka://flink/user/$f.
      14:18:42,258 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, heap=1556938752, managed=10,1)) because there is currently no valid leader id known.
      14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, heap=1556938752, managed=10,1)) because there is currently no valid leader id known.
      14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, heap=1556938752, managed=10,1)) because there is currently no valid leader id known.
      14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,RegisterResourceManager akka://flink/user/resourcemanager-61e00f37-9e99-4355-9099-53b992e8232e) because there is currently no valid leader id known.
      14:18:42,259 INFO org.apache.flink.runtime.jobmanager.JobManager - JobManager akka://flink/user/$f was granted leadership with leader session ID Some(00000000-0000-0000-0000-000000000000).
      14:18:42,253 INFO org.apache.flink.runtime.jobmanager.MemoryArchivist - Started memory archivist akka://flink/user/$e
      14:18:52,275 ERROR org.apache.flink.runtime.testutils.TestingResourceManager - Resource manager could not register at JobManager
      akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(akka://flink/), Path(/user/$f#-1062324203)]] after [10000 ms]
      at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
      at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
      at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
      at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
      at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
      at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
      at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
      at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
      at java.lang.Thread.run(Thread.java:745)
      14:18:52,277 INFO org.apache.flink.runtime.testutils.TestingResourceManager - Trying to associate with JobManager leader akka://flink/user/$f#-1062324203
      14:18:52,277 INFO org.apache.flink.runtime.testutils.TestingResourceManager - Resource Manager associating with leading JobManager Actor[akka://flink/user/$f#-1062324203] - leader session 00000000-0000-0000-0000-000000000000
      java.lang.AssertionError: assertion failed: timeout (19999906340 nanoseconds) during expectMsgClass waiting for class org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage
      at scala.Predef$.assert(Predef.scala:179)
      at akka.testkit.TestKitBase$class.expectMsgClass_internal(TestKit.scala:423)
      at akka.testkit.TestKitBase$class.expectMsgType(TestKit.scala:396)
      at akka.testkit.TestKit.expectMsgType(TestKit.scala:718)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply$mcV$sp(JobManagerRegistrationTest.scala:157)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply(JobManagerRegistrationTest.scala:132)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply(JobManagerRegistrationTest.scala:132)
      at akka.testkit.TestKitBase$class.within(TestKit.scala:296)
      at akka.testkit.TestKit.within(TestKit.scala:718)
      at akka.testkit.TestKitBase$class.within(TestKit.scala:310)
      at akka.testkit.TestKit.within(TestKit.scala:718)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply$mcV$sp(JobManagerRegistrationTest.scala:132)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(JobManagerRegistrationTest.scala:121)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(JobManagerRegistrationTest.scala:121)
      at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
      at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
      at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
      at org.scalatest.Transformer.apply(Transformer.scala:22)
      at org.scalatest.Transformer.apply(Transformer.scala:20)
      at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:953)
      at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.withFixture(JobManagerRegistrationTest.scala:50)
      at org.scalatest.WordSpecLike$class.invokeWithFixture$1(WordSpecLike.scala:950)
      at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:962)
      at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:962)
      at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
      at org.scalatest.WordSpecLike$class.runTest(WordSpecLike.scala:962)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.runTest(JobManagerRegistrationTest.scala:50)
      at org.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1021)
      at org.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1021)
      at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
      at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
      at scala.collection.immutable.List.foreach(List.scala:318)
      at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
      at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:390)
      at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:427)
      at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
      at scala.collection.immutable.List.foreach(List.scala:318)
      at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
      at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
      at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
      at org.scalatest.WordSpecLike$class.runTests(WordSpecLike.scala:1021)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.runTests(JobManagerRegistrationTest.scala:50)
      at org.scalatest.Suite$class.run(Suite.scala:1424)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.org$scalatest$WordSpecLike$$super$run(JobManagerRegistrationTest.scala:50)
      at org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067)
      at org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067)
      at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
      at org.scalatest.WordSpecLike$class.run(WordSpecLike.scala:1067)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.org$scalatest$BeforeAndAfterAll$$super$run(JobManagerRegistrationTest.scala:50)
      at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
      at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
      at org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.run(JobManagerRegistrationTest.scala:50)
      at org.scalatest.junit.JUnitRunner.run(JUnitRunner.scala:99)
      at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
      at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
      at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
      at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
      at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
      

      Note that there is also an exception for the MetricQueryService in the log which does not really affect the unit test though except for metrics not being reported.

        Attachments

          Activity

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              NicoK Nico Kruber
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: