Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
None
-
Reviewed
Description
-1 job submit time cause IllegalArgumentException when parse the Job history file name and JOB_INIT_FAILED cause -1 job submit time in JobIndexInfo.
We found the following job history file name which cause IllegalArgumentException when parse the job status in the job history file name.
job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist
The stack trace for the IllegalArgumentException is
2015-02-10 04:54:01,863 WARN org.apache.hadoop.mapreduce.v2.hs.PartialJob: Exception while parsing job state. Defaulting to KILLED java.lang.IllegalArgumentException: No enum constant org.apache.hadoop.mapreduce.v2.api.records.JobState.0 at java.lang.Enum.valueOf(Enum.java:236) at org.apache.hadoop.mapreduce.v2.api.records.JobState.valueOf(JobState.java:21) at org.apache.hadoop.mapreduce.v2.hs.PartialJob.getState(PartialJob.java:82) at org.apache.hadoop.mapreduce.v2.hs.PartialJob.<init>(PartialJob.java:59) at org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage.getAllPartialJobs(CachedHistoryStorage.java:159) at org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage.getPartialJobs(CachedHistoryStorage.java:173) at org.apache.hadoop.mapreduce.v2.hs.JobHistory.getPartialJobs(JobHistory.java:284) at org.apache.hadoop.mapreduce.v2.hs.webapp.HsWebServices.getJobs(HsWebServices.java:212) at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60) at com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$TypeOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:185) at com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75) at com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:288) at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) at com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108) at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) at com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84) at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1469) at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1400) at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1349) at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1339) at com.sun.jersey.spi.container.servlet.WebComponent.service(WebComponent.java:416) at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:537) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:886) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118) at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1223) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399) at org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216) at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182) at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:767) at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450) at org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230) at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152) at org.mortbay.jetty.Server.handle(Server.java:326) at org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542) at org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:928) at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:549) at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:212) at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404) at org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:410) at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)
when IOException happened in JobImpl#setup, the Job submit time in JobHistoryEventHandler#MetaInfo#JobIndexInfo will not be changed and the Job submit time will be its initial value -1.
this.jobIndexInfo = new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null, queueName);
The following is the sequences to get -1 job submit time:
1.
a job is created at MRAppMaster#serviceStart and the new job is at state JobStateInternal.NEW after created
job = createJob(getConfig(), forcedState, shutDownMessage);
2.
JobEventType.JOB_INIT is sent to JobImpl from MRAppMaster#serviceStart
JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT); // Send init to the job (this does NOT trigger job execution) // This is a synchronous call, not an event through dispatcher. We want // job-init to be done completely here. jobEventDispatcher.handle(initJobEvent);
3.
after JobImpl received JobEventType.JOB_INIT, it call InitTransition#transition
.addTransition
(JobStateInternal.NEW,
EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
JobEventType.JOB_INIT,
new InitTransition())
4.
then the exception happen from setup(job) in InitTransition#transition before JobSubmittedEvent is handled.
JobSubmittedEvent will update the job submit time. Due to the exception, the submit time is still the initial value -1.
This is the code InitTransition#transition
public JobStateInternal transition(JobImpl job, JobEvent event) { job.metrics.submittedJob(job); job.metrics.preparingJob(job); if (job.newApiCommitter) { job.jobContext = new JobContextImpl(job.conf, job.oldJobId); } else { job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(job.conf, job.oldJobId); } try { setup(job); job.fs = job.getFileSystem(job.conf); //log to job history JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId, job.conf.get(MRJobConfig.JOB_NAME, "test"), job.conf.get(MRJobConfig.USER_NAME, "mapred"), job.appSubmitTime, job.remoteJobConfFile.toString(), job.jobACLs, job.queueName, job.conf.get(MRJobConfig.WORKFLOW_ID, ""), job.conf.get(MRJobConfig.WORKFLOW_NAME, ""), job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""), getWorkflowAdjacencies(job.conf), job.conf.get(MRJobConfig.WORKFLOW_TAGS, "")); job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse)); //TODO JH Verify jobACLs, UserName via UGI? TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId); job.numMapTasks = taskSplitMetaInfo.length; job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0); if (job.numMapTasks == 0 && job.numReduceTasks == 0) { job.addDiagnostic("No of maps and reduces are 0 " + job.jobId); } else if (job.numMapTasks == 0) { job.reduceWeight = 0.9f; } else if (job.numReduceTasks == 0) { job.mapWeight = 0.9f; } else { job.mapWeight = job.reduceWeight = 0.45f; } checkTaskLimits(); long inputLength = 0; for (int i = 0; i < job.numMapTasks; ++i) { inputLength += taskSplitMetaInfo[i].getInputDataLength(); } job.makeUberDecision(inputLength); job.taskAttemptCompletionEvents = new ArrayList<TaskAttemptCompletionEvent>( job.numMapTasks + job.numReduceTasks + 10); job.mapAttemptCompletionEvents = new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10); job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>( job.numMapTasks + job.numReduceTasks + 10); job.allowedMapFailuresPercent = job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0); job.allowedReduceFailuresPercent = job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0); // create the Tasks but don't start them yet createMapTasks(job, inputLength, taskSplitMetaInfo); createReduceTasks(job); job.metrics.endPreparingJob(job); return JobStateInternal.INITED; } catch (Exception e) { LOG.warn("Job init failed", e); job.metrics.endPreparingJob(job); job.addDiagnostic("Job init failed : " + StringUtils.stringifyException(e)); // Leave job in the NEW state. The MR AM will detect that the state is // not INITED and send a JOB_INIT_FAILED event. return JobStateInternal.NEW; } }
This is the code JobImpl#setup
protected void setup(JobImpl job) throws IOException { String oldJobIDString = job.oldJobId.toString(); String user = UserGroupInformation.getCurrentUser().getShortUserName(); Path path = MRApps.getStagingAreaDir(job.conf, user); if(LOG.isDebugEnabled()) { LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString); } job.remoteJobSubmitDir = FileSystem.get(job.conf).makeQualified( new Path(path, oldJobIDString)); job.remoteJobConfFile = new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE); // Prepare the TaskAttemptListener server for authentication of Containers // TaskAttemptListener gets the information via jobTokenSecretManager. JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(oldJobIDString)); job.jobToken = new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager); job.jobToken.setService(identifier.getJobId()); // Add it to the jobTokenSecretManager so that TaskAttemptListener server // can authenticate containers(tasks) job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken); LOG.info("Adding job token for " + oldJobIDString + " to jobTokenSecretManager"); // If the job client did not setup the shuffle secret then reuse // the job token secret for the shuffle. if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) { LOG.warn("Shuffle secret key missing from job credentials." + " Using job token secret as shuffle secret."); TokenCache.setShuffleSecretKey(job.jobToken.getPassword(), job.jobCredentials); } }
5.
Due to the IOException from JobImpl#setup, the new job is still at state JobStateInternal.NEW
} catch (Exception e) { LOG.warn("Job init failed", e); job.metrics.endPreparingJob(job); job.addDiagnostic("Job init failed : " + StringUtils.stringifyException(e)); // Leave job in the NEW state. The MR AM will detect that the state is // not INITED and send a JOB_INIT_FAILED event. return JobStateInternal.NEW; }
At the following code of MRAppMaster#serviceStart, The MR AM detect the state is not INITED and send a JOB_INIT_FAILED event.
// If job is still not initialized, an error happened during // initialization. Must complete starting all of the services so failure // events can be processed. initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED); if (initFailed) { JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED); jobEventDispatcher.handle(initFailedEvent); } else { // All components have started, start the job. startJobs(); }
6.
After JobImpl receives the JOB_INIT_FAILED, it will call InitFailedTransition#transition and enter state JobStateInternal.FAIL_ABORT
.addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT,
JobEventType.JOB_INIT_FAILED,
new InitFailedTransition())
7.
JobImpl will send CommitterJobAbortEvent in InitFailedTransition#transition
public void transition(JobImpl job, JobEvent event) { job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); }
8.
CommitterJobAbortEvent will be handled by CommitterEventHandler#handleJobAbort which will send JobAbortCompletedEvent(JobEventType.JOB_ABORT_COMPLETED)
protected void handleJobAbort(CommitterJobAbortEvent event) { cancelJobCommit(); try { committer.abortJob(event.getJobContext(), event.getFinalState()); } catch (Exception e) { LOG.warn("Could not abort job", e); } context.getEventHandler().handle(new JobAbortCompletedEvent( event.getJobID(), event.getFinalState())); }
9.
After JobImpl receives the JOB_ABORT_COMPLETED, it will call JobAbortCompletedTransition#transition and enter state JobStateInternal.FAILED
.addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAILED,
JobEventType.JOB_ABORT_COMPLETED,
new JobAbortCompletedTransition())
10.
JobAbortCompletedTransition#transition will call JobImpl#unsuccessfulFinish which will send JobUnsuccessfulCompletionEvent with finish time.
public void transition(JobImpl job, JobEvent event) { JobStateInternal finalState = JobStateInternal.valueOf( ((JobAbortCompletedEvent) event).getFinalState().name()); job.unsuccessfulFinish(finalState); } private void unsuccessfulFinish(JobStateInternal finalState) { if (finishTime == 0) setFinishTime(); cleanupProgress = 1.0f; JobUnsuccessfulCompletionEvent unsuccessfulJobEvent = new JobUnsuccessfulCompletionEvent(oldJobId, finishTime, succeededMapTaskCount, succeededReduceTaskCount, finalState.toString(), diagnostics); eventHandler.handle(new JobHistoryEvent(jobId, unsuccessfulJobEvent)); finished(finalState); }
11.
JobUnsuccessfulCompletionEvent will be handled by JobHistoryEventHandler#handleEvent with type EventType.JOB_FAILED
Based on the following code, you can see the JobIndexInfo#finishTime is set correctly but JobIndexInfo#submitTime and JobIndexInfo#jobStartTime are still -1.
if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) { try { JobUnsuccessfulCompletionEvent jucEvent = (JobUnsuccessfulCompletionEvent) event .getHistoryEvent(); mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime()); mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps()); mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces()); mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus()); closeEventWriter(event.getJobID()); processDoneFiles(event.getJobID()); } catch (IOException e) { throw new YarnRuntimeException(e); } }
The error job history file name in our log is "job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist"
Based on the filename, you can see submitTime is -1, finishTime is 1423572836007 and jobStartTime is 1423572836007.
The jobStartTime is not -1, and jobStartTime is the same as finishTime.
It is because jobStartTime is handled specially in FileNameIndexUtils#getDoneFileName:
//JobStartTime if (indexInfo.getJobStartTime() >= 0) { sb.append(indexInfo.getJobStartTime()); } else { sb.append(indexInfo.getFinishTime()); }