Does 'node unhealthy' need to be treated differently from 'TooManyFetchFailures' ? Killed versus Failed. This ends up with NodeFailures not counting towards the limit on task attempts.
Yes, I think. Based on our experience, here we are pre-emptively taking action on a task that might actually be ok. And it should be an infrequent action.
JOB_UPDATED_NODES needs to be handled in the JOB_INIT state. Very small chance of hitting this.
My understanding was that scheduling happens when the job moves from INIT to RUNNING state via the StartTransition(). Unless allocate is called on RM it will not return any unhealthy machines. So I thought that JOB_UPDATED_EVENT can never come until the job moves into the RUNNING state. Can you please point out the scenario you are thinking about?
I can make the change for safety reasons, just in case.
Minor: JobImpl.actOnUsableNode can get the task type from the id itself. It doesn't need to fetch the actual task.
Unless you really want this, I would prefer it the way its currently written. I prefer not to depend on string name encodings.
Minor: "if this attempt is not successful" this comment in JobImpl can be removed. It's removing an entry from a successfulAttempt index.
That was a question I had and put it in the comments. It seems that for a TaskAttemptCompletedEventTransition the code removes the previous successful entry from successAttemptCompletionEventNoMap. It then checks if the current attempt is successful, and in that case adds it to the successAttemptCompletionEventNoMap. But what if the current attempt is not successful. We have now removed the previous successful attempt too. Is that the desired behavior. This question is independent of this jira.
In KilledAfterSuccessTransition - createJobCounterUpdateEventTAFailed should be createJobCounterUpdateEventTAKilled
TaskImpl.handleAttemptCompletion - finishedAttempts - this will end up double counting the same task attempt. It's used in some other transition.
I have moved the finishedTask increment out of that function and made it explicit in every transition that requires it to be that way.
In the same context I have a question in comments in MapRetroactiveFailureTransition. Why is this not calling handleAttemptCompletion. My understanding is that handleAttemptCompletion is used to notify reducers about changes in map outputs. So if a map was failed after success then reducers should know about it so that they can abandon its outputs before getting too many fetch failures. Is that not so?
Does the JobHistoryParser need some more changes - to unset fields which may have been set previously by Map/ReduceAttemptSuccessfulEvents and TaskFinishedEvent
Done. Reset all fields set in handleTaskFinishedEvent. Others are already handled in the existing code.
For running tasks - shouldn't running Reduce attempts also be killed ?
My understanding of existing behavior in mrv1 was that only maps are pre-emptively terminated for performance reasons.
RMContainerAllocator.handleUpdatedNodes - instead of fetching the nodeId via appContext, job etc - the nodeId can be stored with the AssignedRequest. 1) getTask, getAttempt require readLocks - can avoid these calls every second. 2) There's an unlikely race where the nodeId may not be assigned in the TaskAttempt (if the dispatcher thread is backlogged). TaskAttemptId.getNodeId() can be avoided. getContainerManagerAddress can be used instead.
Sorry I did not find getContainerManagerAddress(). The map in AssignedRequests stores ContainerId and its not possible to get nodeId from it. What are you proposing?
Not related to this patch. Does JOB_TASK_COMPLETED need to be handled (ignored) in additional states?
It does not look like it but there may be race conditions I have not thought of. But looking further, it seems that the action on this event checks for job completion in TaskCompletedTransition. TaskCompletedTransition increments job.completedTaskCount irrespective of whether the task has succeeded/killed or failed. Now, TaskCompletedTransition.checkJobCompleteSuccess() checks job.completedTaskCount == job.tasks.size() for completion. How is this working? Wont enough killed tasks/failed + completed tasks trigger job completion? Or is that expected behavior?