Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10011

Old job resurrected during HA failover

    Details

      Description

      For the second time we've observed Flink resurrect an old job during JobManager high-availability fail over.

      Configuration

      • AWS environment
      • Flink 1.4.2 standalong cluster in HA mode
      • 2 JMs, 3 TMs
      • 3 node ZK ensemble
      • 1 job consuming to/from Kafka
      • Checkpoints in S3 using the Presto file system adaptor

      Timeline 

      • 15:18:10 JM 2 completes checkpoint 69256.
      • 15:19:10 JM 2 completes checkpoint 69257.
      • 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a SocketTimeoutException
      • 15:19:57 ZK 1 closes connection to JM 2 (leader)
      • 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 1
      • 15:19:57 JM 2 reports it can't read data from ZK
        • Unable to read additional data from server sessionid 0x30000003f4a0003, likely server has closed socket, closing socket connection and attempting reconnect)
        • org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn
      • 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
        • Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
        • ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs are not monitored (temporarily).
        • Connection to ZooKeeper suspended. The contender akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in the leader election 
        • Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
      • 15:19:57 JM 2 gives up leadership
        • JobManager akka://flink/user/jobmanager#33755521 was revoked leadership.
      • 15:19:57 JM 2 changes job 2a4eff355aef849c5ca37dbac04f2ff1 status to SUSPENDED
        • Stopping checkpoint coordinator for job 2a4eff355aef849c5ca37dbac04f2ff1
      • 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages because there is no leader
        • Discard message LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: TaskManager akka://flink/user/taskmanager is disassociating)) because there is currently no valid leader id known.
      • 15:19:57 JM 2 connects to ZK 2 and renews its session
        • Opening socket connection to server ip-10-210-43-221.ec2.internal/10.210.43.221:2181
        • Socket connection established to ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session
        • Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
        • Session establishment complete on server ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 0x30000003f4a0003, negotiated timeout = 40000
        • Connection to ZooKeeper was reconnected. Leader election can be restarted.
        • ZooKeeper connection RECONNECTED. Changes to the submitted job graphs are monitored again.
        • State change: RECONNECTED
      • 15:19:57: JM 1 reports JM 1 has been granted leadership:
        • JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted leadership with leader session ID Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).
      • 15:19:57 JM 2 reports the job has been suspended
        • org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter Shutting down.
        • Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.
      • 15:19:57 JM 2 reports it has lost leadership:
      • 15:19:57 TMs register with JM 1
      • 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs:
        • Attempting to recover all jobs.
        • There are 2 jobs to recover. Starting the job recovery.
        • Attempting to recover job 2a4eff355aef849c5ca37dbac04f2ff1.
        • Attempting to recover job 61bca496065cd05e4263070a5e923a05.
      • 15:20:08 – 15:32:27 ZK 2 reports a large number of errors of the form:
        • Got user-level KeeperException when processing sessionid:0x2000001d2330001 type:create cxid:0x4211 zxid:0x60009dc70 txntype:-1 reqpath:n/a Error Path:/flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 Error:KeeperErrorCode = NodeExists for /flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1
        • Got user-level KeeperException when processing sessionid:0x2000001d2330001 type:create cxid:0x4230 zxid:0x60009dc78 txntype:-1 reqpath:n/a Error Path:/flink/cluster_a/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1/0000000000000069255/37d25086-374f-4969-b763-4261e87022a2 Error:KeeperErrorCode = NodeExists for /flink/cluster_a/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1/0000000000000069255/37d25086-374f-4969-b763-4261e87022a2
      • 15:29:08 JM 1 starts to recover job 2a4eff355aef849c5ca37dbac04f2ff1
        • Recovered SubmittedJobGraph(2a4eff355aef849c5ca37dbac04f2ff1, JobInfo(clients: Set((Actor[akka.tcp://flink@ip-10-210-42-62.ec2.internal:37564/temp/$c],DETACHED)), start: 1528833686265)).
        • Submitting recovered job 2a4eff355aef849c5ca37dbac04f2ff1.
        • Submitting job 2a4eff355aef849c5ca37dbac04f2ff1 (Some Job) (Recovery).
        • Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=30000) for 2a4eff355aef849c5ca37dbac04f2ff1.
        • Successfully ran initialization on master in 0 ms.
        • Job recovers via failover strategy: full graph restart
        • Running initialization on master for job Some Job (2a4eff355aef849c5ca37dbac04f2ff1).
        • Initialized in '/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1'.
        • Using application-defined state backend for checkpoint/savepoint metadata: File State Backend @ s3://bucket/flink/cluster_1/checkpoints.
        • Persisting periodic checkpoints externally at s3://bucket/flink/cluster_1/checkpoints-external.
        • Recovering checkpoints from ZooKeeper.
        • Found 3 checkpoints in ZooKeeper.
        • Trying to retrieve checkpoint 69255.
        • Trying to fetch 3 checkpoints from storage.
        • Trying to retrieve checkpoint 69256.
        • Trying to retrieve checkpoint 69257.
        • Restoring from latest valid checkpoint: Checkpoint 69257 @ 1532989148882 for 2a4eff355aef849c5ca37dbac04f2ff1.
        • Scheduling job 2a4eff355aef849c5ca37dbac04f2ff1 (Some Job).
        • Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state CREATED to RUNNING.
        • Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state RUNNING to FAILING.{{ org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration.}}
      • 15:20:09 JM 1 starts to recover 61bca496065cd05e4263070a5e923a05
        • Recovered SubmittedJobGraph(61bca496065cd05e4263070a5e923a05, JobInfo(clients: Set((Actor[akka.tcp://flink@ip-10-210-22-167.ec2.internal:41001/temp/$c],DETACHED)), start: 1525728377900)).
        • Submitting recovered job 61bca496065cd05e4263070a5e923a05.
        • Submitting job 61bca496065cd05e4263070a5e923a05 (Some Job) (Recovery).
        • Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=30000) for 61bca496065cd05e4263070a5e923a05.
        • Job recovers via failover strategy: full graph restart
        • Successfully ran initialization on master in 0 ms.
        • Running initialization on master for job Some Job (61bca496065cd05e4263070a5e923a05).
        • Initialized in '/checkpoints/61bca496065cd05e4263070a5e923a05'.
        • Using application-defined state backend for checkpoint/savepoint metadata: File State Backend @ s3://bucket/flink/cluster_1/checkpoints.
        • Persisting periodic checkpoints externally at s3://bucket/flink/cluster_1/checkpoints-external.
        • Recovering checkpoints from ZooKeeper.
        • Scheduling job 61bca496065cd05e4263070a5e923a05 (Some Job).
        • Job Some Job (61bca496065cd05e4263070a5e923a05) switched from state CREATED to RUNNING.
        • Trying to fetch 0 checkpoints from storage
        • Found 0 checkpoints in ZooKeeper.
      • 15:20:09 JM 1 reports a bunch of metric collisions because of the two jobs:
        • Name collision: Group already contains a Metric with the name 'lastCheckpointSize'. Metric will not be reported.[jobmanager, job]
        • Name collision: Group already contains a Metric with the name 'lastCheckpointAlignmentBuffered'. Metric will not be reported.[jobmanager, job]
        • etc
      • 15:20:39 JM 1 begins attempting to restart the 2a4eff355aef849c5ca37dbac04f2ff1 job repeatedly
        • Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state FAILING to RESTARTING.
        • Restarting the job Some Job (2a4eff355aef849c5ca37dbac04f2ff1).
        • Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state RESTARTING to CREATED.
        • Recovering checkpoints from ZooKeeper.
        • Found 3 checkpoints in ZooKeeper.
        • Trying to fetch 3 checkpoints from storage.
        • Trying to retrieve checkpoint 69255.
        • Trying to retrieve checkpoint 69256.
        • Trying to retrieve checkpoint 69257.
        • Restoring from latest valid checkpoint: Checkpoint 69257 @ 1532989148882 for 2a4eff355aef849c5ca37dbac04f2ff1.
        • Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state CREATED to RUNNING.
        • Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state RUNNING to FAILING.
      • 15:35:39 ZK 1 reestablishes connection with ZK 2 and 3 and becomes a follower

       

      Analysis

       

      The cluster was running job 2a4eff355aef849c5ca37dbac04f2ff1.  The JM HA leader was JM 2, which was connected to ZK 1.  ZK 1 was a follower in the ZK ensemble.  The ZK leader was ZK 2.  

      ZK 1 lost network connectivity for about 16 minutes.  Upon loss of connectivity to ZK 1, JM 2 gives up leadership and shutdown the  2a4eff355aef849c5ca37dbac04f2ff1 job.  JM 2 then immediately connects to ZK 2, without its ZK session having expired.  Nonetheless, as it gave up leadership JM 1 is elected the new JM leader.

      JM 1 then erroneously decides there are two jobs to restore.  The previously running job,  2a4eff355aef849c5ca37dbac04f2ff1, and 61bca496065cd05e4263070a5e923a05.  JM 1 decides there are no checkpoints for 61bca496065cd05e4263070a5e923a05, which starts right away.   JM 1 attempts to restore 2a4eff355aef849c5ca37dbac04f2ff1 from the latest checkpoint, but it fails because there aren't enough task slots in the cluster as a result of the other job starting.  Thus,  2a4eff355aef849c5ca37dbac04f2ff1 entered a restart loop.

      We manually stopped both jobs and starts a new job based on the last known checkpoint for  2a4eff355aef849c5ca37dbac04f2ff1.

       

      Job 61bca496065cd05e4263070a5e923a05  is an old job that we ran for a month between May 7th and June 5th.

      After our manual intervention, the the HA state directory in S3 looks like this:

      s3cmd ls s3://bucket/flink/cluster_1/recovery/
      {{ DIR s3://bucket/flink/cluster_1/recovery/some_job/}}
      2018-07-31 17:33 35553 s3://bucket/flink/cluster_1/recovery/completedCheckpoint12e06bef01c5
      2018-07-31 17:34 35553 s3://bucket/flink/cluster_1/recovery/completedCheckpoint187e0d2ae7cb
      2018-07-31 17:32 35553 s3://bucket/flink/cluster_1/recovery/completedCheckpoint22fc8ca46f02
      2018-06-12 20:01 284626 s3://bucket/flink/cluster_1/recovery/submittedJobGraph7f627a661cec
      2018-07-30 23:01 285257 s3://bucket/flink/cluster_1/recovery/submittedJobGraphf3767780c00c

      submittedJobGraph7f627a661cec appears to be job 2a4eff355aef849c5ca37dbac04f2ff1, the long running job that failed during the ZK failover

      submittedJobGraphf3767780c00c appears to be job d77948df92813a68ea6dfd6783f40e7e, the job we started restoring from a checkpoint after shutting down the duplicate jobs

       

      A few questions come to mind.

      Why does the JM terminate running jobs when it can immediately connect to another ZK node and its ZK session has not expired?

      This seems to be a result of using the LeaderLatch recipe in Curator.  It's docs state: 

      LeaderLatch instances add a ConnectionStateListener to watch for connection problems. If SUSPENDED or LOST is reported, the LeaderLatch that is the leader will report that it is no longer the leader (i.e. there will not be a leader until the connection is re-established). If a LOST connection is RECONNECTED, the LeaderLatch will delete its previous ZNode and create a new one.

      Users of LeaderLatch must take account that connection issues can cause leadership to be lost. i.e. hasLeadership() returns true but some time later the connection is SUSPENDED or LOST. At that point hasLeadership() will return false. It is highly recommended that LeaderLatch users register a ConnectionStateListener.

      So not only is leadership lost while disconnected, but will likely stay lost when reconnecting as a result of the znode deletion and recreation.

      This can probably be solved by using the Curator LeaderElection recipe instead, which states:

      The LeaderSelectorListener class extends ConnectionStateListener. When the LeaderSelector is started, it adds the listener to the Curator instance. Users of the LeaderSelector must pay attention to any connection state changes. If an instance becomes the leader, it should respond to notification of being SUSPENDED or LOST.

      If the SUSPENDED state is reported, the instance must assume that it might no longer be the leader until it receives a RECONNECTED state. If the LOST state is reported, the instance is no longer the leader and its takeLeadership method should exit.

      So with LeaderElection it seems that what to do during the SUSPENDED state is left up to the application, which may choose to continue being leader until the state becomes LOST.

      Obviously there are dangers with doing so, but at the very least you would expect the JM not to give up leadership until it tried to connect to other ZK members within the remaining ZK session timeout.

      This problem has been previously discussed in the mailing list, which led to FLINK-6174 and this PR, which appears to be a modification of the Curator LeaderLatch recipe.  It also lead to FLINK-5703, which seems like an attempt to keep jobs running during JM failover.  While that is a valuable addition, I argue that is not required to avoid this failure, as the previous leader can continue being leader so long as it connects to a new ZK before its ZK session expires.

       

      Why did JM 1 resurrect the old job?

      Something must have been off with the state stored in the S3 HA recovery directory.  The job must have not been fully removed.  

      In fact, right now the recovery directory has the file submittedJobGraph7f627a661cec which appears to be job 2a4eff355aef849c5ca37dbac04f2ff1. Is that expected?  That job is no longer running.  Shouldn't that file no longer exist in the recovery directory?

       

       

       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                till.rohrmann Till Rohrmann
                Reporter:
                elevy Elias Levy
              • Votes:
                1 Vote for this issue
                Watchers:
                16 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: