Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6641

HA recovery on YARN: ClusterClient calls HighAvailabilityServices#closeAndCleanupAll

    XMLWordPrintableJSON

Details

    Description

      While testing the 1.3 RC1, I was unable to recovery from a JobManager failure.

      The following error message appears in the JobManager that is started by YARN:

      2017-05-19 14:41:48,133 INFO  org.apache.flink.yarn.YarnJobManager                          - Submitting recovered job 35df3b24209e9ebea12407e6747a746c.
      2017-05-19 14:41:48,133 INFO  org.apache.flink.yarn.YarnJobManager                          - Submitting job 35df3b24209e9ebea12407e6747a746c (CarTopSpeedWindowingExample) (Recovery).
      2017-05-19 14:41:48,139 ERROR org.apache.flink.yarn.YarnJobManager                          - Failed to submit job 35df3b24209e9ebea12407e6747a746c (CarTopSpeedWindowingExample)
      org.apache.flink.runtime.client.JobSubmissionException: Cannot set up the user code libraries: Cannot get library with hash 935fb4229e27f166d5ba8912cd47974ad436af20
      	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1267)
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:517)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
      	at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100)
      	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
      	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
      	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
      	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
      	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
      	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
      	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: java.io.IOException: Cannot get library with hash 935fb4229e27f166d5ba8912cd47974ad436af20
      	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerReferenceToBlobKeyAndGetURL(BlobLibraryCacheManager.java:264)
      	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:117)
      	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerJob(BlobLibraryCacheManager.java:89)
      	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1262)
      	... 25 more
      Caused by: java.io.IOException: Failed to copy from blob store.
      	at org.apache.flink.runtime.blob.BlobServer.getURL(BlobServer.java:387)
      	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerReferenceToBlobKeyAndGetURL(BlobLibraryCacheManager.java:255)
      	... 28 more
      Caused by: java.io.FileNotFoundException: File does not exist: /shared/recovery-dir/application_1494870922226_0061/blob/cache/blob_935fb4229e27f166d5ba8912cd47974ad436af20
      	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
      	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1963)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1904)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1884)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1856)
      	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:568)
      	at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:89)
      	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:363)
      	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
      	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
      	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
      	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086)
      	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082)
      	at java.security.AccessController.doPrivileged(Native Method)
      	at javax.security.auth.Subject.doAs(Subject.java:415)
      	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
      	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080)
      
      	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
      	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
      	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
      	at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
      	at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
      	at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
      	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1133)
      	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1121)
      	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1111)
      	at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:272)
      	at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:239)
      	at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:232)
      	at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1279)
      	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:296)
      	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:292)
      	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
      	at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:292)
      	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:765)
      	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:404)
      	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:48)
      	at org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:106)
      	at org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:87)
      	at org.apache.flink.runtime.blob.BlobServer.getURL(BlobServer.java:384)
      	... 29 more
      Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /shared/recovery-dir/application_1494870922226_0061/blob/cache/blob_935fb4229e27f166d5ba8912cd47974ad436af20
      	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
      	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1963)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1904)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1884)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1856)
      	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:568)
      	at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:89)
      	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:363)
      	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
      	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
      	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
      	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086)
      	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082)
      	at java.security.AccessController.doPrivileged(Native Method)
      	at javax.security.auth.Subject.doAs(Subject.java:415)
      	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
      	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080)
      
      	at org.apache.hadoop.ipc.Client.call(Client.java:1406)
      	at org.apache.hadoop.ipc.Client.call(Client.java:1359)
      	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
      	at com.sun.proxy.$Proxy9.getBlockLocations(Unknown Source)
      	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:206)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:606)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
      	at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
      	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1131)
      	... 45 more
      

      The problem is that the Client is shutting down (due to other issues), and during that phase it's deleting the blob server files in HDFS.

      Attachments

        Activity

          People

            trohrmann Till Rohrmann
            rmetzger Robert Metzger
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: