Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30268

HA metadata and other cluster submission related errors should not throw DeploymentFailedException

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      Currently most critical cluster submission errors , and checks that validate HA metadata before deployment, end up throwing DeploymentFailedException.

      This causes the operator to go into a weird state and actually hide the error in subsequent loops:

      flink-kubernetes-operator 2022-12-01 21:55:03,978 o.a.f.k.o.l.AuditUtils         [INFO ][default/basic-checkpoint-ha-example] >>> Status | Info    | UPGRADING       | The resource is being upgraded 
      flink-kubernetes-operator 2022-12-01 21:55:03,992 o.a.f.k.o.l.AuditUtils         [INFO ][default/basic-checkpoint-ha-example] >>> Event  | Info    | SUBMIT          | Starting deployment
      flink-kubernetes-operator 2022-12-01 21:55:03,992 o.a.f.k.o.s.AbstractFlinkService [INFO ][default/basic-checkpoint-ha-example] Deploying application cluster requiring last-state from HA metadata
      flink-kubernetes-operator 2022-12-01 21:55:03,997 o.a.f.k.o.c.FlinkDeploymentController [ERROR][default/basic-checkpoint-ha-example] Flink Deployment failed
      flink-kubernetes-operator org.apache.flink.kubernetes.operator.exception.DeploymentFailedException: HA metadata not available to restore from last state. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. Manual restore required.
      flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.validateHaMetadataExists(AbstractFlinkService.java:844)
      flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitApplicationCluster(AbstractFlinkService.java:177)
      flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:195)
      flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:60)
      flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:210)
      flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:142)
      flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:161)
      flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:62)
      flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:123)
      flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:54)
      flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136)
      flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:94)
      flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
      flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:93)
      flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:130)
      flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:110)
      flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:81)
      flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54)
      flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
      flink-kubernetes-operator     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
      flink-kubernetes-operator     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
      flink-kubernetes-operator     at java.base/java.lang.Thread.run(Unknown Source)
      flink-kubernetes-operator 2022-12-01 21:55:04,034 o.a.f.k.o.l.AuditUtils         [INFO ][default/basic-checkpoint-ha-example] >>> Event  | Warning | RESTOREFAILED   | HA metadata not available to restore from last state. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. Manual restore required.
      flink-kubernetes-operator 2022-12-01 21:55:04,034 o.a.f.k.o.c.FlinkDeploymentController [INFO ][default/basic-checkpoint-ha-example] End of reconciliation
      flink-kubernetes-operator 2022-12-01 21:55:04,054 o.a.f.k.o.l.AuditUtils         [INFO ][default/basic-checkpoint-ha-example] >>> Status | Error   | UPGRADING       | {"type":"org.apache.flink.kubernetes.operator.exception.DeploymentFailedException","message":"HA metadata not available to restore from last state. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. Manual restore required.","additionalMetadata":{"reason":"RestoreFailed"},"throwableList":[]} 
      flink-kubernetes-operator 2022-12-01 21:55:19,056 o.a.f.k.o.c.FlinkDeploymentController [INFO ][default/basic-checkpoint-ha-example] Starting reconciliation
      flink-kubernetes-operator 2022-12-01 21:55:19,058 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][default/basic-checkpoint-ha-example] UPGRADE change(s) detected (FlinkDeploymentSpec[job.state=RUNNING] differs from FlinkDeploymentSpec[job.state=SUSPENDED]), starting reconciliation.
      flink-kubernetes-operator 2022-12-01 21:55:19,092 o.a.f.k.o.l.AuditUtils         [INFO ][default/basic-checkpoint-ha-example] >>> Status | Info    | UPGRADING       | The resource is being upgraded 
      flink-kubernetes-operator 2022-12-01 21:55:19,119 o.a.f.k.o.r.d.ApplicationReconciler [ERROR][default/basic-checkpoint-ha-example] Invalid status for deployment: FlinkDeploymentStatus(super=CommonStatus(jobStatus=JobStatus(jobName=CarTopSpeedWindowingExample, jobId=8d5c59b7e960984cd845b9977754d2ef, state=RECONCILING, startTime=1669931677233, updateTime=1669931696153, savepointInfo=SavepointInfo(lastSavepoint=null, triggerId=null, triggerTimestamp=null, triggerType=null, formatType=null, savepointHistory=[], lastPeriodicSavepointTimestamp=0)), error=null), clusterInfo={flink-version=1.15.2, flink-revision=69e8126 @ 2022-08-17T14:58:06+02:00}, jobManagerDeploymentStatus=ERROR, reconciliationStatus=FlinkDeploymentReconciliationStatus(super=ReconciliationStatus(reconciliationTimestamp=1669931719059, lastReconciledSpec={"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/TopSpeedWindowing.jar","parallelism":2,"entryClass":null,"args":[],"state":"suspended","savepointTriggerNonce":0,"initialSavepointPath":null,"upgradeMode":"last-state","allowNonRestoredState":null},"restartNonce":2,"flinkConfiguration":{"high-availability":"org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory","high-availability.storageDir":"file:///flink-data/ha","state.checkpoints.dir":"file:///flink-data/checkpoints","state.savepoints.dir":"file:///flink-data/savepoints","taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.15","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_15","ingress":null,"podTemplate":{"apiVersion":"v1","kind":"Pod","spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/flink-data","name":"flink-volume"}]}],"volumes":[{"hostPath":{"path":"/tmp/flink","type":"Directory"},"name":"flink-volume"}]}},"jobManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":5},"firstDeployment":false}}, lastStableSpec=null, state=UPGRADING)), taskManager=TaskManagerInfo(labelSelector=, replicas=0))
      flink-kubernetes-operator 2022-12-01 21:55:19,133 o.a.f.k.o.l.AuditUtils         [INFO ][default/basic-checkpoint-ha-example] >>> Event  | Warning | CLUSTERDEPLOYMENTEXCEPTION | This indicates a bug...
      flink-kubernetes-operator 2022-12-01 21:55:19,136 o.a.f.k.o.r.ReconciliationUtils [WARN ][default/basic-checkpoint-ha-example] Attempt count: 0, last attempt: false
      flink-kubernetes-operator 2022-12-01 21:55:19,163 o.a.f.k.o.l.AuditUtils         [INFO ][default/basic-checkpoint-ha-example] >>> Status | Error   | UPGRADING       | {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.lang.RuntimeException: This indicates a bug...","throwableList":[{"type":"java.lang.RuntimeException","message":"This indicates a bug..."}]} 
      flink-kubernetes-operator 2022-12-01 21:55:19,164 i.j.o.p.e.ReconciliationDispatcher [ERROR][default/basic-checkpoint-ha-example] Error during event processing ExecutionScope{ resource id: ResourceID{name='basic-checkpoint-ha-example', namespace='default'}, version: 350553} failed.
      flink-kubernetes-operator org.apache.flink.kubernetes.operator.exception.ReconciliationException: java.lang.RuntimeException: This indicates a bug...
      flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:133)
      flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:54)
      flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136)
      flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:94)
      flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
      flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:93)
      flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:130)
      flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:110)
      flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:81)
      flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54)
      flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
      flink-kubernetes-operator     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
      flink-kubernetes-operator     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
      flink-kubernetes-operator     at java.base/java.lang.Thread.run(Unknown Source)
      flink-kubernetes-operator Caused by: java.lang.RuntimeException: This indicates a bug...
      flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:180)
      flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:60)
      flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:210)
      flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:142)
      flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:161)
      flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:62)
      flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:123)
      flink-kubernetes-operator     ... 13 more
      

      The main cause here is that DeploymentFailedExceptions were originally created so that the observer could signal a JobManager deployment failure (after it was submitted). Thus the error handler logic in the controller actually updates the jmDeploymentStatus and the job state which causes the problem.

      To avoid this we should introduce a new Exception type or use something more suitable. We should not touch touch the jobmanagerDeploymentStatus or the jobstatus in most of these cases and simply retrigger the reconciliation. This will keep the CR in an error loop triggering warnings etc but that is expected in these critical failure scenarios.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            pvary Peter Vary
            gyfora Gyula Fora
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment