Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4182

HA recovery not working properly under ApplicationMaster failures.

    XMLWordPrintableJSON

Details

    Description

      When randomly killing TaskManager and ApplicationMaster, a job sometimes does not properly recover in HA mode.

      There can be different symptoms for this. For example, in one case the job is dying with the following exception:

       The program finished with the following exception:
      
      org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot set up the user code libraries: Cannot get library with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
      	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
      	at org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:208)
      	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
      	at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
      	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381)
      	at da.testing.StreamingStateMachineJob.main(StreamingStateMachineJob.java:61)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:606)
      	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
      	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
      	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
      	at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:738)
      	at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
      	at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:966)
      	at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1009)
      Caused by: org.apache.flink.runtime.client.JobSubmissionException: Cannot set up the user code libraries: Cannot get library with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
      	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089)
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:506)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
      	at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:105)
      	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
      	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
      	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
      	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
      	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
      	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
      	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: java.io.IOException: Cannot get library with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
      	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerReferenceToBlobKeyAndGetURL(BlobLibraryCacheManager.java:257)
      	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:116)
      	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerJob(BlobLibraryCacheManager.java:88)
      	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1084)
      	... 26 more
      Caused by: java.io.IOException: Failed to copy from blob store.
      	at org.apache.flink.runtime.blob.BlobServer.getURL(BlobServer.java:358)
      	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerReferenceToBlobKeyAndGetURL(BlobLibraryCacheManager.java:248)
      	... 29 more
      Caused by: java.io.IOException: gs:///flink/recovery/blob/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0 does not exist.
      	at org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:121)
      	at org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:93)
      	at org.apache.flink.runtime.blob.BlobServer.getURL(BlobServer.java:355)
      	... 30 more
      

      In other cases, i noticed inconsistencies in the results by testing with a streaming state machine job and a Kafka source. My guess is that value state is no restored properly, because all invalid transactions in the log start from the initial state, which is the default value for the value state.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              srichter Stefan Richter
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: