Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7278

Flink job can stuck while ZK leader reelected during ZK cluster migration

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Minor
    • Resolution: Abandoned
    • None
    • None
    • Runtime / Coordination
    • None

    Description

      We have observed an potential failure case while Flink job was running during ZK migration. Below describes the scenario.

      1. Flink cluster running with standalone mode on Netfilx Titus container runtime
      2. We performed a ZK migration by updating new OS image one node at a time.
      3. During ZK leader reelection, Flink cluster starts to exhibit failures and eventually end in a non-recoverable failure mode.
      4. This behavior does not repro every time, may be caused by an edge race condition.

      Below is a list of error messages ordered by event time:
      017-07-22 02:47:44,535 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Source -> Sink: Sink (67/176) (0442d63c89809ad86f38874c845ba83f) switched from RUNNING to FAILED.
      java.lang.Exception: TaskManager was lost/killed: ResourceID

      {resourceId='f519795dfabcecfd7863ed587efdb398'}

      @ titus-123072-worker-3-39 (dataPort=46879)
      at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
      at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
      at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
      at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
      at org.apache.flink.runtime.instance.InstanceManager.unregisterAllTaskManagers(InstanceManager.java:234)
      at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:330)
      at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
      at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
      at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
      at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
      at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
      at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
      at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
      at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
      at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
      at akka.actor.ActorCell.invoke(ActorCell.scala:487)
      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
      at akka.dispatch.Mailbox.run(Mailbox.scala:220)
      at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

      2017-07-22 02:47:44,621 WARN com.netflix.spaas.runtime.FlinkJobManager - Discard message LeaderSessionMessage(7a247ad9-531b-4f27-877b-df41f9019431,Disconnect(0b300c04592b19750678259cd09fea95,java.lang.Exception: TaskManager akka://flink/user/taskmanager is disassociating)) because the expected leader session ID None did not equal the received leader session ID Some(7a247ad9-531b-4f27-877b-df41f9019431).
      Permalink Edit Delete
      zxu Zhenzhong Xu added a comment - 07/26/2017 09:24 PM
      2017-07-22 02:47:45,015 WARN netflix.spaas.shaded.org.apache.zookeeper.ClientCnxn - Session 0x2579bebfd265054 for server 100.83.64.121/100.83.64.121:2181, unexpected error, closing socket connection and attempting reconnect
      java.io.IOException: Connection reset by peer
      at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
      at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
      at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
      at sun.nio.ch.IOUtil.read(IOUtil.java:192)
      at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
      at netflix.spaas.shaded.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
      at netflix.spaas.shaded.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
      at netflix.spaas.shaded.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
      Permalink Edit Delete
      zxu Zhenzhong Xu added a comment - 07/26/2017 09:25 PM
      2017-07-22 02:47:44,557 ERROR org.apache.kafka.clients.producer.KafkaProducer - Interrupted while joining ioThread
      java.lang.InterruptedException
      at java.lang.Object.wait(Native Method)
      at java.lang.Thread.join(Thread.java:1260)
      at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)
      at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
      at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
      at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:320)
      at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
      at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:431)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666)
      at java.lang.Thread.run(Thread.java:748)

      2017-07-22 02:47:44,663 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal of stream operator.
      org.apache.kafka.common.KafkaException: Failed to close kafka producer
      at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:732)
      at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
      at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
      at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:320)
      at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
      at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:431)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666)
      at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.InterruptedException
      at java.lang.Object.wait(Native Method)
      at java.lang.Thread.join(Thread.java:1260)
      at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)
      ... 9 more

      2017-07-22 02:47:45,079 WARN netflix.spaas.shaded.org.apache.zookeeper.ClientCnxn - Session 0x35841491f044692 for server null, unexpected error, closing socket connection and attempting reconnect
      java.net.ConnectException: Connection refused
      at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
      at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
      at netflix.spaas.shaded.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
      at netflix.spaas.shaded.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)

      2017-07-22 02:47:59,521 ERROR org.apache.flink.shaded.org.apache.curator.ConnectionState - Connection timed out for connection string (100.83.64.121:2181,100.83.104.81:2181,100.83.135.236:2181,100.83.146.196:2181,100.83.17.206:2181) and timeout (15000) / elapsed (15002)
      org.apache.flink.shaded.org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss
      at org.apache.flink.shaded.org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197)
      at org.apache.flink.shaded.org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:87)
      at org.apache.flink.shaded.org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:115)
      at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:806)
      at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:792)
      at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:62)
      at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:257)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:748)

      2017-07-22 02:48:24,523 ERROR org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl - Background operation retry gave up
      netflix.spaas.shaded.org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss
      at netflix.spaas.shaded.org.apache.zookeeper.KeeperException.create(KeeperException.java:99)
      at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:708)
      at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:826)
      at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:792)
      at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:62)
      at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:257)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:748)

      2017-07-22 02:49:34,592 ERROR org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager - Resource manager could not register at JobManager
      akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(akka://flink/), Path(/user/jobmanager)]] after [10000 ms]
      at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
      at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
      at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
      at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
      at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
      at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
      at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
      at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
      at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
      at java.lang.Thread.run(Thread.java:748)

      Attachments

        Activity

          People

            Unassigned Unassigned
            zhenzhongxu Zhenzhong Xu
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: