Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33522

Savepoint upgrade mode fails despite the savepoint succeeding

    XMLWordPrintableJSON

Details

    Description

      Under certain circumstances, savepoint creation can succeed but the job fails afterwards. One example is when there are messages being distributed by the source coordinator to finished tasks. This is possibly a Flink bug although it's not clear yet how to solve the issue.

      After the savepoint succeeded Flink fails the job like this:

      Source (1/2) (cd4d56ddb71c0e763cc400bcfe2fd8ac_4081cf0163fcce7fe6af0cf07ad2d43c_0_0) switched from RUNNING to FAILED on host-taskmanager-1-1 @ ip(dataPort=36519). 
      
      An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency. Event: 'AddSplitEvents[[[B@722a23fa]]', targetTask: Source (1/2) - execution #0
      Caused by:
      org.apache.flink.runtime.operators.coordination.TaskNotRunningException: Task is not running, but in state FINISHED
         at org.apache.flink.runtime.taskmanager.Task.deliverOperatorEvent(Task.java:1502)
         at org.apache.flink.runtime.taskexecutor.TaskExecutor.sendOperatorEventToTask
      

      Inside the operator this is processed as:

      java.util.concurrent.CompletionException: org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException: A savepoint has been created at: s3://..., but the corresponding job 1b1a3061194c62ded6e2fe823b61b2ea failed during stopping. The savepoint is consistent, but might have uncommitted transactions. If you want to commit the transaction please restart a job from this savepoint. 
      
                java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
                java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) 
                org.apache.flink.kubernetes.operator.service.AbstractFlinkService.cancelJob(AbstractFlinkService.java:319) 
                org.apache.flink.kubernetes.operator.service.NativeFlinkService.cancelJob(NativeFlinkService.java:121) 
                org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.cancelJob(ApplicationReconciler.java:223) 
                org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:122) 
               org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:163)
                org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:136) 
                org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56) 
                io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138) 
                io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96) 
                org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80) 
                io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95) 
                io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139) 
                io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119) 
                io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89) 
                io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62) 
                io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414) 
                java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
                java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
                java.lang.Thread.run(Thread.java:829) 
      

      Subsequently we get the following because HA metadata is not available anymore. It has been cleared up after the terminal job failure:

      org.apache.flink.kubernetes.operator.exception.RecoveryFailureException","message":"HA metadata not available to restore from last state. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. 
      

      The deployment needs to be manually restored from a savepoint.

      Attachments

        Activity

          People

            mxm Maximilian Michels
            mxm Maximilian Michels
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: