Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
None
Description
Currently most critical cluster submission errors , and checks that validate HA metadata before deployment, end up throwing DeploymentFailedException.
This causes the operator to go into a weird state and actually hide the error in subsequent loops:
flink-kubernetes-operator 2022-12-01 21:55:03,978 o.a.f.k.o.l.AuditUtils [INFO ][default/basic-checkpoint-ha-example] >>> Status | Info | UPGRADING | The resource is being upgraded flink-kubernetes-operator 2022-12-01 21:55:03,992 o.a.f.k.o.l.AuditUtils [INFO ][default/basic-checkpoint-ha-example] >>> Event | Info | SUBMIT | Starting deployment flink-kubernetes-operator 2022-12-01 21:55:03,992 o.a.f.k.o.s.AbstractFlinkService [INFO ][default/basic-checkpoint-ha-example] Deploying application cluster requiring last-state from HA metadata flink-kubernetes-operator 2022-12-01 21:55:03,997 o.a.f.k.o.c.FlinkDeploymentController [ERROR][default/basic-checkpoint-ha-example] Flink Deployment failed flink-kubernetes-operator org.apache.flink.kubernetes.operator.exception.DeploymentFailedException: HA metadata not available to restore from last state. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. Manual restore required. flink-kubernetes-operator at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.validateHaMetadataExists(AbstractFlinkService.java:844) flink-kubernetes-operator at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitApplicationCluster(AbstractFlinkService.java:177) flink-kubernetes-operator at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:195) flink-kubernetes-operator at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:60) flink-kubernetes-operator at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:210) flink-kubernetes-operator at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:142) flink-kubernetes-operator at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:161) flink-kubernetes-operator at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:62) flink-kubernetes-operator at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:123) flink-kubernetes-operator at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:54) flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136) flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:94) flink-kubernetes-operator at org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80) flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:93) flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:130) flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:110) flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:81) flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54) flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406) flink-kubernetes-operator at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) flink-kubernetes-operator at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) flink-kubernetes-operator at java.base/java.lang.Thread.run(Unknown Source) flink-kubernetes-operator 2022-12-01 21:55:04,034 o.a.f.k.o.l.AuditUtils [INFO ][default/basic-checkpoint-ha-example] >>> Event | Warning | RESTOREFAILED | HA metadata not available to restore from last state. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. Manual restore required. flink-kubernetes-operator 2022-12-01 21:55:04,034 o.a.f.k.o.c.FlinkDeploymentController [INFO ][default/basic-checkpoint-ha-example] End of reconciliation flink-kubernetes-operator 2022-12-01 21:55:04,054 o.a.f.k.o.l.AuditUtils [INFO ][default/basic-checkpoint-ha-example] >>> Status | Error | UPGRADING | {"type":"org.apache.flink.kubernetes.operator.exception.DeploymentFailedException","message":"HA metadata not available to restore from last state. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. Manual restore required.","additionalMetadata":{"reason":"RestoreFailed"},"throwableList":[]} flink-kubernetes-operator 2022-12-01 21:55:19,056 o.a.f.k.o.c.FlinkDeploymentController [INFO ][default/basic-checkpoint-ha-example] Starting reconciliation flink-kubernetes-operator 2022-12-01 21:55:19,058 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][default/basic-checkpoint-ha-example] UPGRADE change(s) detected (FlinkDeploymentSpec[job.state=RUNNING] differs from FlinkDeploymentSpec[job.state=SUSPENDED]), starting reconciliation. flink-kubernetes-operator 2022-12-01 21:55:19,092 o.a.f.k.o.l.AuditUtils [INFO ][default/basic-checkpoint-ha-example] >>> Status | Info | UPGRADING | The resource is being upgraded flink-kubernetes-operator 2022-12-01 21:55:19,119 o.a.f.k.o.r.d.ApplicationReconciler [ERROR][default/basic-checkpoint-ha-example] Invalid status for deployment: FlinkDeploymentStatus(super=CommonStatus(jobStatus=JobStatus(jobName=CarTopSpeedWindowingExample, jobId=8d5c59b7e960984cd845b9977754d2ef, state=RECONCILING, startTime=1669931677233, updateTime=1669931696153, savepointInfo=SavepointInfo(lastSavepoint=null, triggerId=null, triggerTimestamp=null, triggerType=null, formatType=null, savepointHistory=[], lastPeriodicSavepointTimestamp=0)), error=null), clusterInfo={flink-version=1.15.2, flink-revision=69e8126 @ 2022-08-17T14:58:06+02:00}, jobManagerDeploymentStatus=ERROR, reconciliationStatus=FlinkDeploymentReconciliationStatus(super=ReconciliationStatus(reconciliationTimestamp=1669931719059, lastReconciledSpec={"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/TopSpeedWindowing.jar","parallelism":2,"entryClass":null,"args":[],"state":"suspended","savepointTriggerNonce":0,"initialSavepointPath":null,"upgradeMode":"last-state","allowNonRestoredState":null},"restartNonce":2,"flinkConfiguration":{"high-availability":"org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory","high-availability.storageDir":"file:///flink-data/ha","state.checkpoints.dir":"file:///flink-data/checkpoints","state.savepoints.dir":"file:///flink-data/savepoints","taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.15","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_15","ingress":null,"podTemplate":{"apiVersion":"v1","kind":"Pod","spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/flink-data","name":"flink-volume"}]}],"volumes":[{"hostPath":{"path":"/tmp/flink","type":"Directory"},"name":"flink-volume"}]}},"jobManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":5},"firstDeployment":false}}, lastStableSpec=null, state=UPGRADING)), taskManager=TaskManagerInfo(labelSelector=, replicas=0)) flink-kubernetes-operator 2022-12-01 21:55:19,133 o.a.f.k.o.l.AuditUtils [INFO ][default/basic-checkpoint-ha-example] >>> Event | Warning | CLUSTERDEPLOYMENTEXCEPTION | This indicates a bug... flink-kubernetes-operator 2022-12-01 21:55:19,136 o.a.f.k.o.r.ReconciliationUtils [WARN ][default/basic-checkpoint-ha-example] Attempt count: 0, last attempt: false flink-kubernetes-operator 2022-12-01 21:55:19,163 o.a.f.k.o.l.AuditUtils [INFO ][default/basic-checkpoint-ha-example] >>> Status | Error | UPGRADING | {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.lang.RuntimeException: This indicates a bug...","throwableList":[{"type":"java.lang.RuntimeException","message":"This indicates a bug..."}]} flink-kubernetes-operator 2022-12-01 21:55:19,164 i.j.o.p.e.ReconciliationDispatcher [ERROR][default/basic-checkpoint-ha-example] Error during event processing ExecutionScope{ resource id: ResourceID{name='basic-checkpoint-ha-example', namespace='default'}, version: 350553} failed. flink-kubernetes-operator org.apache.flink.kubernetes.operator.exception.ReconciliationException: java.lang.RuntimeException: This indicates a bug... flink-kubernetes-operator at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:133) flink-kubernetes-operator at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:54) flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136) flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:94) flink-kubernetes-operator at org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80) flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:93) flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:130) flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:110) flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:81) flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54) flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406) flink-kubernetes-operator at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) flink-kubernetes-operator at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) flink-kubernetes-operator at java.base/java.lang.Thread.run(Unknown Source) flink-kubernetes-operator Caused by: java.lang.RuntimeException: This indicates a bug... flink-kubernetes-operator at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:180) flink-kubernetes-operator at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:60) flink-kubernetes-operator at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:210) flink-kubernetes-operator at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:142) flink-kubernetes-operator at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:161) flink-kubernetes-operator at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:62) flink-kubernetes-operator at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:123) flink-kubernetes-operator ... 13 more
The main cause here is that DeploymentFailedExceptions were originally created so that the observer could signal a JobManager deployment failure (after it was submitted). Thus the error handler logic in the controller actually updates the jmDeploymentStatus and the job state which causes the problem.
To avoid this we should introduce a new Exception type or use something more suitable. We should not touch touch the jobmanagerDeploymentStatus or the jobstatus in most of these cases and simply retrigger the reconciliation. This will keep the CR in an error loop triggering warnings etc but that is expected in these critical failure scenarios.
Attachments
Issue Links
- links to