Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-11225

Error state of addedJobGraphs when Dispatcher with concurrent revoking and granting leadership

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 1.6.2
    • None
    • Runtime / Coordination
    • None
    • flink 1.6.2  on yarn 

    Description

      Dispatcher was revoked leadership and immediately grant leadership in some cases like appmaster go through a long time of full gc。 This can lead to Dispatcher.'revokeLeadership' and 'grantLeadership' concurrently run。Then ZooKeeperSubmittedJobGraphStore may 'recoverJobGraph' happen before 'releaseJobGraph',and  addedJobGraphs in ZooKeeperSubmittedJobGraphStore do not contain the running job。 Later when we cancle the running job, cant not remove jobgraph from zk $basedir/jobgraphs/$job_id. If appmaster restart it will recover the cancled job.

       case, appmaster log:

      2018-12-08 21:12:03,729 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn[main-SendThread(******:2181)] - Client session timed out, have not heard from server in 40082ms for sessionid 0x1657682ceee6082, closing socket connection and attempting reconnect
      2018-12-08 21:12:03,978 INFO org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager[main-EventThread] - State change: SUSPENDED

      2018-12-08 21:12:03,980 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService[Curator-ConnectionStateManager-0] - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
      2018-12-08 21:12:03,980 WARN org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService[Curator-ConnectionStateManager-0] - Connection to ZooKeeper suspended. The contender http://**:** no longer participates in the leader election.
      2018-12-08 21:12:03,980 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService[Curator-ConnectionStateManager-0] - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
      2018-12-08 21:12:03,981 WARN org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService[Curator-ConnectionStateManager-0] - Connection to ZooKeeper suspended. The contender akka.tcp://flink@bjm7-lc453.jxq:44815/user/resourcemanager no longer participates in the leader election.
      2018-12-08 21:12:03,982 INFO org.apache.flink.runtime.jobmaster.JobManagerRunner [Curator-ConnectionStateManager-0] - JobManager for job job_*** (2a16bfa299b56432e1141df3b1361fbc) was revoked leadership at akka.tcp://flink@***:**/user/jobmanager_0.
      2018-12-08 21:12:03,984 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService[flink-akka.actor.default-dispatcher-186] - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.

      2018-12-08 21:12:03,986 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [Curator-ConnectionStateManager-0] - http://***:** lost leadership
      2018-12-08 21:12:03,986 INFO org.apache.flink.yarn.YarnResourceManager [flink-akka.actor.default-dispatcher-287] - ResourceManager akka.tcp://flink@***:*/user/resourcemanager was *revoked leadership. Clearing fencing token.
      2018-12-08 21:12:03,986 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService[flink-akka.actor.default-dispatcher-287] - Stopping ZooKeeperLeaderRetrievalService /leader/2a16bfa299b56432e1141df3b1361fbc/job_manager_lock.

      2018-12-08 21:12:03,990 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher    [flink-akka.actor.default-dispatcher-281]  - Dispatcher akka.tcp://flink@bjm7-lc453.jxq:44815/user/dispatcher was revoked leadership.

      2018-12-08 21:12:03,990 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher    [flink-akka.actor.default-dispatcher-281]  - Stopping all currently running jobs of dispatcher akka.tcp://flink@***:**/user/dispatcher.

      2018-12-08 21:12:04,181 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn[main-SendThread(10.54.33.12:2181)] - Session establishment complete on server ***/***:2181, sessionid = 0x1657682ceee6082, negotiated timeout = 60000
      2018-12-08 21:12:04,181 INFO org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager[main-EventThread] - State change: RECONNECTED

      2018-12-08 21:12:04,188 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [main-EventThread] - http://bjm7-lc453.jxq:45684 was granted leadership with leaderSessionID=43f8a4f4-d3a6-48fb-afef-1f2f03ad5626
      2018-12-08 21:12:04,188 INFO org.apache.flink.yarn.YarnResourceManager [flink-akka.actor.default-dispatcher-281] - ResourceManager akka.tcp://flink@bjm7-lc453.jxq:44815/user/resourcemanager was granted leadership with fencing token acacde8ee4e115851f872189e7064971
      2018-12-08 21:12:04,188 INFO org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager[flink-akka.actor.default-dispatcher-281] - Starting the SlotManager.
      2018-12-08 21:12:04,190 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [flink-akka.actor.default-dispatcher-218] - Dispatcher akka.tcp://flink@***:**/user/dispatcher was granted leadership with fencing token 8fc420d6-5526-41b0-ac0f-881437c55919
      2018-12-08 21:12:04,190 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [flink-akka.actor.default-dispatcher-276] - Recovering all persisted jobs.

      2018-12-08 21:12:04,624 INFO org.apache.flink.runtime.jobmaster.JobMaster [flink-akka.actor.default-dispatcher-186] - Stopping the JobMaster for job job_***(2a16bfa299b56432e1141df3b1361fbc).
      2018-12-08 21:12:04,648 INFO org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore[flink-akka.actor.default-dispatcher-276] - Recovered SubmittedJobGraph(2a16bfa299b56432e1141df3b1361fbc, null).
      2018-12-08 21:12:04,665 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPool [flink-akka.actor.default-dispatcher-242] - Stopping SlotPool.
      2018-12-08 21:12:04,673 INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService[flink-akka.actor.default-dispatcher-186] - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/2a16bfa299b56432e1141df3b1361fbc/job_manager_lock'}.

      2018-12-08 21:12:08,283 INFO  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore[flink-akka.actor.default-dispatcher-218]  - Released locks of job graph 2a16bfa299b56432e1141df3b1361fbc from ZooKeeper.

       

      here, addedJobGraphs  in ZooKeeperSubmittedJobGraphStore do not contain the running job 2a16bfa299b56432e1141df3b1361fbc

      later the job is cancled ,but when appmaster restart, it will try to recover the cancled job and fail.  appmaster  log:

       

      2018-12-08 22:02:30,160 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [flink-akka.actor.default-dispatcher-670] - Fatal error occurred in the cluster entrypoint.
      java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
      at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
      at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
      at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
      at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
      at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176)
      at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
      at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
      at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
      ... 7 more
      Caused by: java.lang.Exception: Cannot set up the user code libraries: File does not exist: /home/flink/data/flink/state/*/*/zk/*/blob/job_2a16bfa299b56432e1141df3b1361fbc/blob_p-ce59b177934b5091b6aa0f244265465fce2a6b9b-32b9e2c3e16da7bc6e27339e5bd19bef
      at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
      at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
      at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2028)
      at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1998)
      at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1911)
      at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:572)
      at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:89)
      at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
      at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
      at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
      at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1076)
      at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2250)
      at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2246)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:422)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1796)
      at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2244)

      at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:134)
      ... 10 more
      Caused by: java.io.FileNotFoundException: File does not exist: /home/flink/data/flink/state/**/**/zk/**/blob/job_2a16bfa299b56432e1141df3b1361fbc/blob_p-ce59b177934b5091b6aa0f244265465fce2a6b9b-32b9e2c3e16da7bc6e27339e5bd19bef
      at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
      at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
      at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2028)
      at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1998)
      at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1911)
      at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:572)
      at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:89)
      at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
      at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
      at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
      at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1076)
      at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2250)
      at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2246)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:422)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1796)
      at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2244)

      at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
      at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
      at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
      at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
      at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
      at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
      at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1211)
      at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1199)
      at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1189)
      at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:275)
      at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:242)
      at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:235)
      at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1487)
      at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:302)
      at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298)
      at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
      at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298)
      at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
      at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:120)
      at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:37)
      at org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:102)
      at org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:84)
      at org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:493)
      at org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:444)
      at org.apache.flink.runtime.blob.BlobServer.getFile(BlobServer.java:417)
      at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
      at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerJob(BlobLibraryCacheManager.java:91)
      at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:131)
      ... 10 more
      Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /home/flink/data/flink/state/*/*/zk/**/blob/job_2a16bfa299b56432e1141df3b1361fbc/blob_p-ce59b177934b5091b6aa0f244265465fce2a6b9b-32b9e2c3e16da7bc6e27339e5bd19bef

      at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
      at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
      at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2028)
      at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1998)
      at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1911)
      at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:572)
      at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:89)
      at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
      at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
      at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
      at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1076)
      at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2250)
      at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2246)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:422)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1796)
      at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2244)

      at org.apache.hadoop.ipc.Client.call(Client.java:1470)
      at org.apache.hadoop.ipc.Client.call(Client.java:1401)
      at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
      at com.sun.proxy.$Proxy9.getBlockLocations(Unknown Source)
      at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:254)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
      at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
      at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
      at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1209)
      ... 31 more

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              dongtingting8877@163.com dongtingting
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: