Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
1.5.0
-
None
Description
When we are removing the JobGraph from JobManager for example after invoking cancel(), the following code is executed :
val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val result = if (removeJobFromStateBackend) { val futureOption = Some(future { try { // ...otherwise, we can have lingering resources when there is a concurrent shutdown // and the ZooKeeper client is closed. Not removing the job immediately allow the // shutdown to release all resources. submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t) } }(context.dispatcher)) try { archive ! decorateMessage( ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch { case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t) } futureOption } else { None } currentJobs.remove(jobID) result case None => None } // remove all job-related BLOBs from local and HA store libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) futureOption }
This causes the asynchronous removal of the job and synchronous removal of blob files connected with this jar. This means as far as I understand that there is a potential problem that we can fail to remove job graph from submittedJobGraphs. If the JobManager fails and we elect the new leader it can try to recover such job, but it will fail with an exception since the assigned blob was already removed.
Attachments
Issue Links
- is related to
-
FLINK-11665 Flink fails to remove JobGraph from ZK even though it reports it did
- Closed
- links to