Description
MAPREDUCE-4371 added cyclic dependency check for a topologic sorted Job DAG. However it is somehow not properly implemented for the `sort` phase.
In the while loop as below:
while (!SourceSet.isEmpty()) { ControlledJob controlledJob = SourceSet.iterator().next(); SourceSet.remove(controlledJob); if (controlledJob.getDependentJobs() != null) { for (int i = 0; i < controlledJob.getDependentJobs().size(); i++) { ControlledJob depenControlledJob = controlledJob.getDependentJobs().get(i); processedMap.get(controlledJob).add(depenControlledJob); if (!hasInComingEdge(controlledJob, jobList, processedMap)) { SourceSet.add(depenControlledJob); } } } }
It adds the parent/dependent Job node to the processedMap for current Job node. And then it's supposed to add the parent/dependent Job node into SourceSet if the parent Job node is not processed yet or does not have any child node except the current processed one. However it mistakenly checks the current one: hasInComingEdge(controlledJob) , while adding the parent node to the SourceSet: SourceSet.add(depenControlledJob)
This breaks Job DAGs like below:
job1.addDependingJob(job2);
job2.addDependingJob(job3);
job4.addDependingJob(job2);
Above code reports a cyclic dependency for job2, because job2 is added into SourceSet twice.