Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6063

HA Configuration doesn't work with Flink 1.2

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Not A Problem
    • 1.2.0
    • None
    • Runtime / Coordination
    • None

    Description

      I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 TaskManagers. I start the Zookeeper Quorum from JobManager1, I get confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink job on this JobManager1.

      The flink-conf.yaml is the same on all 5 VMs (also everything else related to flink because I copied the folder across all VMs as suggested in tutorials) this means jobmanager.rpc.address: points to JobManager1 everywhere.

      If I turn off the VM running JobManager1 I would expect Zookeeper to say one of the remaining JobManagers is the leader and the TaskManagers should reconnect to it. Instead a new leader is elected but the slaves keep connecting to the old master

      2017-03-15 10:28:28,655 INFO org.apache.flink.core.fs.FileSystem - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source -> Flat Map (1/1)
      2017-03-15 10:28:38,534 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
      2017-03-15 10:28:46,606 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
      2017-03-15 10:28:52,431 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
      2017-03-15 10:29:02,435 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
      2017-03-15 10:29:10,489 INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its leadership.
      2017-03-15 10:29:10,490 INFO org.apache.flink.runtime.taskmanager.TaskManager - Cancelling all computations and discarding all cached data.
      2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: Custom Source -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
      2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED.
      java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its leadership.
      at org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074)
      at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426)
      at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286)
      at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
      at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
      at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
      at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
      at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
      at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
      at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
      at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
      at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122)
      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
      at akka.actor.ActorCell.invoke(ActorCell.scala:487)
      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
      at akka.dispatch.Mailbox.run(Mailbox.scala:220)
      at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      2017-03-15 10:29:10,512 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Source: Custom Source -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
      2017-03-15 10:29:10,515 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Flat Map (1/1) (dd555e0437867c3180a1ecaf0a9f4d04).
      2017-03-15 10:29:10,515 INFO org.apache.flink.runtime.taskmanager.Task - Flat Map (1/1) (dd555e0437867c3180a1ecaf0a9f4d04) switched from RUNNING to FAILED.
      java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its leadership.
      at org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074)
      at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426)
      at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286)
      at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
      at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
      at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
      at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
      at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
      at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
      at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
      at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
      at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122)
      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
      at akka.actor.ActorCell.invoke(ActorCell.scala:487)
      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
      at akka.dispatch.Mailbox.run(Mailbox.scala:220)
      at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      2017-03-15 10:29:10,516 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Flat Map (1/1) (dd555e0437867c3180a1ecaf0a9f4d04).
      2017-03-15 10:29:10,516 INFO org.apache.flink.runtime.taskmanager.TaskManager - Disassociating from JobManager
      2017-03-15 10:29:10,525 INFO org.apache.flink.runtime.blob.BlobCache - Shutting down BlobCache
      2017-03-15 10:29:10,542 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
      2017-03-15 10:29:10,546 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
      2017-03-15 10:29:10,548 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Flat Map (1/1) (dd555e0437867c3180a1ecaf0a9f4d04).
      2017-03-15 10:29:10,551 INFO org.apache.flink.core.fs.FileSystem - Ensuring all FileSystem streams are closed for Flat Map (1/1)
      2017-03-15 10:29:10,552 INFO org.apache.flink.runtime.taskmanager.TaskManager - Trying to register at JobManager akka.tcp://flink@1.2.3.5:43893/user/jobmanager (attempt 1, timeout: 500 milliseconds)
      2017-03-15 10:29:10,567 INFO org.apache.flink.core.fs.FileSystem - Ensuring all FileSystem streams are closed for Source: Custom Source -> Flat Map (1/1)
      2017-03-15 10:29:10,632 INFO org.apache.flink.runtime.taskmanager.TaskManager - Successful registration at JobManager (akka.tcp://flink@1.2.3.5:43893/user/jobmanager), starting network stack and library cache.
      2017-03-15 10:29:10,633 INFO org.apache.flink.runtime.taskmanager.TaskManager - Determined BLOB server address to be /1.2.3.5:42830. Starting BLOB cache.
      2017-03-15 10:29:10,633 INFO org.apache.flink.runtime.blob.BlobCache - Created BLOB cache storage directory /tmp/blobStore-d97e08db-d2f1-4f00-a7d1-30c2f5823934
      2017-03-15 10:29:15,551 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
      2017-03-15 10:29:20,571 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
      2017-03-15 10:29:25,582 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
      2017-03-15 10:29:30,592 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]

      I modified the original IPs to 1.2.3.4 for JobManager1 and 1.2.3.5 for JobManager2 for confidentiality.

      Attachments

        1. zoo.cfg
          0.5 kB
          Razvan
        2. slaves
          0.0 kB
          Razvan
        3. masters
          0.1 kB
          Razvan
        4. Logs.tar.gz
          57 kB
          Razvan
        5. flink-conf.yaml
          7 kB
          Razvan

        Activity

          People

            Unassigned Unassigned
            razvan Razvan
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: