Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6641

HA recovery on YARN: ClusterClient calls HighAvailabilityServices#closeAndCleanupAll

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.3.0, 1.4.0
    • Fix Version/s: 1.3.0
    • Labels:
      None

      Description

      While testing the 1.3 RC1, I was unable to recovery from a JobManager failure.

      The following error message appears in the JobManager that is started by YARN:

      2017-05-19 14:41:48,133 INFO  org.apache.flink.yarn.YarnJobManager                          - Submitting recovered job 35df3b24209e9ebea12407e6747a746c.
      2017-05-19 14:41:48,133 INFO  org.apache.flink.yarn.YarnJobManager                          - Submitting job 35df3b24209e9ebea12407e6747a746c (CarTopSpeedWindowingExample) (Recovery).
      2017-05-19 14:41:48,139 ERROR org.apache.flink.yarn.YarnJobManager                          - Failed to submit job 35df3b24209e9ebea12407e6747a746c (CarTopSpeedWindowingExample)
      org.apache.flink.runtime.client.JobSubmissionException: Cannot set up the user code libraries: Cannot get library with hash 935fb4229e27f166d5ba8912cd47974ad436af20
      	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1267)
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:517)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
      	at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100)
      	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
      	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
      	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
      	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
      	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
      	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
      	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: java.io.IOException: Cannot get library with hash 935fb4229e27f166d5ba8912cd47974ad436af20
      	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerReferenceToBlobKeyAndGetURL(BlobLibraryCacheManager.java:264)
      	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:117)
      	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerJob(BlobLibraryCacheManager.java:89)
      	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1262)
      	... 25 more
      Caused by: java.io.IOException: Failed to copy from blob store.
      	at org.apache.flink.runtime.blob.BlobServer.getURL(BlobServer.java:387)
      	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerReferenceToBlobKeyAndGetURL(BlobLibraryCacheManager.java:255)
      	... 28 more
      Caused by: java.io.FileNotFoundException: File does not exist: /shared/recovery-dir/application_1494870922226_0061/blob/cache/blob_935fb4229e27f166d5ba8912cd47974ad436af20
      	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
      	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1963)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1904)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1884)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1856)
      	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:568)
      	at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:89)
      	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:363)
      	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
      	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
      	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
      	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086)
      	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082)
      	at java.security.AccessController.doPrivileged(Native Method)
      	at javax.security.auth.Subject.doAs(Subject.java:415)
      	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
      	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080)
      
      	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
      	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
      	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
      	at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
      	at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
      	at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
      	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1133)
      	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1121)
      	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1111)
      	at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:272)
      	at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:239)
      	at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:232)
      	at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1279)
      	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:296)
      	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:292)
      	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
      	at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:292)
      	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:765)
      	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:404)
      	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:48)
      	at org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:106)
      	at org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:87)
      	at org.apache.flink.runtime.blob.BlobServer.getURL(BlobServer.java:384)
      	... 29 more
      Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /shared/recovery-dir/application_1494870922226_0061/blob/cache/blob_935fb4229e27f166d5ba8912cd47974ad436af20
      	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
      	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1963)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1904)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1884)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1856)
      	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:568)
      	at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:89)
      	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:363)
      	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
      	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
      	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
      	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086)
      	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082)
      	at java.security.AccessController.doPrivileged(Native Method)
      	at javax.security.auth.Subject.doAs(Subject.java:415)
      	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
      	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080)
      
      	at org.apache.hadoop.ipc.Client.call(Client.java:1406)
      	at org.apache.hadoop.ipc.Client.call(Client.java:1359)
      	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
      	at com.sun.proxy.$Proxy9.getBlockLocations(Unknown Source)
      	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:206)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:606)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
      	at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
      	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1131)
      	... 45 more
      

      The problem is that the Client is shutting down (due to other issues), and during that phase it's deleting the blob server files in HDFS.

        Activity

        Hide
        till.rohrmann Till Rohrmann added a comment -

        I think one problem is that the ClusterClient calls HighAvailabilityServices#closeAndCleanupAll. The other problem is probably that the YarnClusterClient deletes the session directory which also acts as the Yarn application working directory.

        Show
        till.rohrmann Till Rohrmann added a comment - I think one problem is that the ClusterClient calls HighAvailabilityServices#closeAndCleanupAll . The other problem is probably that the YarnClusterClient deletes the session directory which also acts as the Yarn application working directory.
        Hide
        rmetzger Robert Metzger added a comment -

        Yes, I agree. This JIRA is actually two issues.

        Let me create another one and rename this one (this one becomes the HighAvailabilityServices#closeAndCleanupAll one)

        Show
        rmetzger Robert Metzger added a comment - Yes, I agree. This JIRA is actually two issues. Let me create another one and rename this one (this one becomes the HighAvailabilityServices#closeAndCleanupAll one)
        Hide
        till.rohrmann Till Rohrmann added a comment -

        1.4.0: b93396c5a48c604c403dc383cf34eba3d446642e
        1.3.0: c15f37397822d8df0ebcbea30f94c669374432b7

        Show
        till.rohrmann Till Rohrmann added a comment - 1.4.0: b93396c5a48c604c403dc383cf34eba3d446642e 1.3.0: c15f37397822d8df0ebcbea30f94c669374432b7
        Hide
        aljoscha Aljoscha Krettek added a comment -

        Reopen to set fix version.

        Show
        aljoscha Aljoscha Krettek added a comment - Reopen to set fix version.

          People

          • Assignee:
            till.rohrmann Till Rohrmann
            Reporter:
            rmetzger Robert Metzger
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development