Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-8943

Jobs will not recover if DFS is temporarily unavailable

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 1.4.2, 1.5.0, 1.6.0
    • 1.5.0
    • Runtime / Coordination

    Description

      Description
      Job graphs will be recovered only once from the DFS. If the DFS is unavailable at the recovery attempt, the jobs will simply be not running until the master is restarted again.

      Steps to reproduce

      1. Submit job on Flink Cluster with HDFS as HA storage dir.
      2. Trigger job recovery by killing the master
      3. Stop HDFS NameNode
      4. Enable HDFS NameNode after job recovery is over
      5. Verify that job is not running.

      Expected behavior
      The new master should fail fast and exit. The new master should re-attempt the recovery.

      Stacktrace

      2018-03-14 14:01:37,704 ERROR org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Could not recover the job graph for a41d50b6f3ac16a730dd12792a847c97.
      org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph from state handle under /a41d50b6f3ac16a730dd12792a847c97. This indicates that the retrieved state handle is broken. Try cleaning the state handle store.
      	at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:192)
      	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$recoverJobs$5(Dispatcher.java:557)
      	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
      	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: java.net.ConnectException: Call From ip-172-31-43-54/172.31.43.54 to ip-172-31-32-118.eu-central-1.compute.internal:9000 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
      	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
      	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
      	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
      	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
      	at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:801)
      	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
      	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1493)
      	at org.apache.hadoop.ipc.Client.call(Client.java:1435)
      	at org.apache.hadoop.ipc.Client.call(Client.java:1345)
      	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
      	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
      	at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
      	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
      	at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
      	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:843)
      	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:832)
      	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:821)
      	at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:325)
      	at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:285)
      	at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:270)
      	at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1132)
      	at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:325)
      	at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:322)
      	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
      	at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:322)
      	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)
      	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
      	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
      	at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
      	at org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:64)
      	at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:57)
      	at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:186)
      	... 7 more
      Caused by: java.net.ConnectException: Connection refused
      	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
      	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
      	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
      	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
      	at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:685)
      	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:788)
      	at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:410)
      	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1550)
      	at org.apache.hadoop.ipc.Client.call(Client.java:1381)
      	... 40 more
      

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            trohrmann Till Rohrmann
            gjy Gary Yao
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment