Uploaded image for project: 'Hadoop Map/Reduce'
  1. Hadoop Map/Reduce
  2. MAPREDUCE-6259

IllegalArgumentException due to missing job submit time

    XMLWordPrintableJSON

Details

    • Reviewed

    Description

      -1 job submit time cause IllegalArgumentException when parse the Job history file name and JOB_INIT_FAILED cause -1 job submit time in JobIndexInfo.
      We found the following job history file name which cause IllegalArgumentException when parse the job status in the job history file name.

      job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist
      

      The stack trace for the IllegalArgumentException is

      2015-02-10 04:54:01,863 WARN org.apache.hadoop.mapreduce.v2.hs.PartialJob: Exception while parsing job state. Defaulting to KILLED
      java.lang.IllegalArgumentException: No enum constant org.apache.hadoop.mapreduce.v2.api.records.JobState.0
      	at java.lang.Enum.valueOf(Enum.java:236)
      	at org.apache.hadoop.mapreduce.v2.api.records.JobState.valueOf(JobState.java:21)
      	at org.apache.hadoop.mapreduce.v2.hs.PartialJob.getState(PartialJob.java:82)
      	at org.apache.hadoop.mapreduce.v2.hs.PartialJob.<init>(PartialJob.java:59)
      	at org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage.getAllPartialJobs(CachedHistoryStorage.java:159)
      	at org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage.getPartialJobs(CachedHistoryStorage.java:173)
      	at org.apache.hadoop.mapreduce.v2.hs.JobHistory.getPartialJobs(JobHistory.java:284)
      	at org.apache.hadoop.mapreduce.v2.hs.webapp.HsWebServices.getJobs(HsWebServices.java:212)
      	at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:606)
      	at com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60)
      	at com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$TypeOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:185)
      	at com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75)
      	at com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:288)
      	at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
      	at com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108)
      	at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
      	at com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84)
      	at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1469)
      	at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1400)
      	at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1349)
      	at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1339)
      	at com.sun.jersey.spi.container.servlet.WebComponent.service(WebComponent.java:416)
      	at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:537)
      	at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:886)
      	at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834)
      	at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795)
      	at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163)
      	at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58)
      	at com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118)
      	at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113)
      	at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
      	at org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109)
      	at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
      	at org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1223)
      	at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
      	at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
      	at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
      	at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
      	at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
      	at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399)
      	at org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216)
      	at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182)
      	at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:767)
      	at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450)
      	at org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230)
      	at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
      	at org.mortbay.jetty.Server.handle(Server.java:326)
      	at org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542)
      	at org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:928)
      	at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:549)
      	at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:212)
      	at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404)
      	at org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:410)
      	at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)
      

      when IOException happened in JobImpl#setup, the Job submit time in JobHistoryEventHandler#MetaInfo#JobIndexInfo will not be changed and the Job submit time will be its initial value -1.

            this.jobIndexInfo =
                new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null,
                                 queueName);
      

      The following is the sequences to get -1 job submit time:
      1.
      a job is created at MRAppMaster#serviceStart and the new job is at state JobStateInternal.NEW after created

          job = createJob(getConfig(), forcedState, shutDownMessage);
      

      2.
      JobEventType.JOB_INIT is sent to JobImpl from MRAppMaster#serviceStart

            JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
            // Send init to the job (this does NOT trigger job execution)
            // This is a synchronous call, not an event through dispatcher. We want
            // job-init to be done completely here.
            jobEventDispatcher.handle(initJobEvent);
      

      3.
      after JobImpl received JobEventType.JOB_INIT, it call InitTransition#transition

                .addTransition
                    (JobStateInternal.NEW,
                    EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
                    JobEventType.JOB_INIT,
                    new InitTransition())
      

      4.
      then the exception happen from setup(job) in InitTransition#transition before JobSubmittedEvent is handled.
      JobSubmittedEvent will update the job submit time. Due to the exception, the submit time is still the initial value -1.
      This is the code InitTransition#transition

      public JobStateInternal transition(JobImpl job, JobEvent event) {
            job.metrics.submittedJob(job);
            job.metrics.preparingJob(job);
            if (job.newApiCommitter) {
              job.jobContext = new JobContextImpl(job.conf, job.oldJobId);
            } else {
              job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(job.conf, job.oldJobId);
            }
            try {
              setup(job);
              job.fs = job.getFileSystem(job.conf);
              //log to job history
              JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
                    job.conf.get(MRJobConfig.JOB_NAME, "test"), 
                  job.conf.get(MRJobConfig.USER_NAME, "mapred"),
                  job.appSubmitTime,
                  job.remoteJobConfFile.toString(),
                  job.jobACLs, job.queueName,
                  job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
                  job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
                  job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
                  getWorkflowAdjacencies(job.conf),
                  job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
              job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
              //TODO JH Verify jobACLs, UserName via UGI?
      
              TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
              job.numMapTasks = taskSplitMetaInfo.length;
              job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
      
              if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
                job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
              } else if (job.numMapTasks == 0) {
                job.reduceWeight = 0.9f;
              } else if (job.numReduceTasks == 0) {
                job.mapWeight = 0.9f;
              } else {
                job.mapWeight = job.reduceWeight = 0.45f;
              }
      
              checkTaskLimits();
      
              long inputLength = 0;
              for (int i = 0; i < job.numMapTasks; ++i) {
                inputLength += taskSplitMetaInfo[i].getInputDataLength();
              }
      
              job.makeUberDecision(inputLength);
              
              job.taskAttemptCompletionEvents =
                  new ArrayList<TaskAttemptCompletionEvent>(
                      job.numMapTasks + job.numReduceTasks + 10);
              job.mapAttemptCompletionEvents =
                  new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10);
              job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>(
                  job.numMapTasks + job.numReduceTasks + 10);
      
              job.allowedMapFailuresPercent =
                  job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
              job.allowedReduceFailuresPercent =
                  job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
      
              // create the Tasks but don't start them yet
              createMapTasks(job, inputLength, taskSplitMetaInfo);
              createReduceTasks(job);
      
              job.metrics.endPreparingJob(job);
              return JobStateInternal.INITED;
            } catch (Exception e) {
              LOG.warn("Job init failed", e);
              job.metrics.endPreparingJob(job);
              job.addDiagnostic("Job init failed : "
                  + StringUtils.stringifyException(e));
              // Leave job in the NEW state. The MR AM will detect that the state is
              // not INITED and send a JOB_INIT_FAILED event.
              return JobStateInternal.NEW;
            }
          }
      

      This is the code JobImpl#setup

          protected void setup(JobImpl job) throws IOException {
      
            String oldJobIDString = job.oldJobId.toString();
            String user = 
              UserGroupInformation.getCurrentUser().getShortUserName();
            Path path = MRApps.getStagingAreaDir(job.conf, user);
            if(LOG.isDebugEnabled()) {
              LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString);
            }
      
            job.remoteJobSubmitDir =
                FileSystem.get(job.conf).makeQualified(
                    new Path(path, oldJobIDString));
            job.remoteJobConfFile =
                new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
      
            // Prepare the TaskAttemptListener server for authentication of Containers
            // TaskAttemptListener gets the information via jobTokenSecretManager.
            JobTokenIdentifier identifier =
                new JobTokenIdentifier(new Text(oldJobIDString));
            job.jobToken =
                new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager);
            job.jobToken.setService(identifier.getJobId());
            // Add it to the jobTokenSecretManager so that TaskAttemptListener server
            // can authenticate containers(tasks)
            job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken);
            LOG.info("Adding job token for " + oldJobIDString
                + " to jobTokenSecretManager");
      
            // If the job client did not setup the shuffle secret then reuse
            // the job token secret for the shuffle.
            if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) {
              LOG.warn("Shuffle secret key missing from job credentials."
                  + " Using job token secret as shuffle secret.");
              TokenCache.setShuffleSecretKey(job.jobToken.getPassword(),
                  job.jobCredentials);
            }
          }
      

      5.
      Due to the IOException from JobImpl#setup, the new job is still at state JobStateInternal.NEW

            } catch (Exception e) {
              LOG.warn("Job init failed", e);
              job.metrics.endPreparingJob(job);
              job.addDiagnostic("Job init failed : "
                  + StringUtils.stringifyException(e));
              // Leave job in the NEW state. The MR AM will detect that the state is
              // not INITED and send a JOB_INIT_FAILED event.
              return JobStateInternal.NEW;
            }
      

      At the following code of MRAppMaster#serviceStart, The MR AM detect the state is not INITED and send a JOB_INIT_FAILED event.

            // If job is still not initialized, an error happened during
            // initialization. Must complete starting all of the services so failure
            // events can be processed.
            initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED);
          if (initFailed) {
            JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
            jobEventDispatcher.handle(initFailedEvent);
          } else {
            // All components have started, start the job.
            startJobs();
          }
      

      6.
      After JobImpl receives the JOB_INIT_FAILED, it will call InitFailedTransition#transition and enter state JobStateInternal.FAIL_ABORT

                .addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT,
                    JobEventType.JOB_INIT_FAILED,
                    new InitFailedTransition())
      

      7.
      JobImpl will send CommitterJobAbortEvent in InitFailedTransition#transition

          public void transition(JobImpl job, JobEvent event) {
              job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
                      job.jobContext,
                      org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
          }
      

      8.
      CommitterJobAbortEvent will be handled by CommitterEventHandler#handleJobAbort which will send JobAbortCompletedEvent(JobEventType.JOB_ABORT_COMPLETED)

          protected void handleJobAbort(CommitterJobAbortEvent event) {
            cancelJobCommit();
            try {
              committer.abortJob(event.getJobContext(), event.getFinalState());
            } catch (Exception e) {
              LOG.warn("Could not abort job", e);
            }
            context.getEventHandler().handle(new JobAbortCompletedEvent(
                event.getJobID(), event.getFinalState()));
          }
      

      9.
      After JobImpl receives the JOB_ABORT_COMPLETED, it will call JobAbortCompletedTransition#transition and enter state JobStateInternal.FAILED

                .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAILED,
                    JobEventType.JOB_ABORT_COMPLETED,
                    new JobAbortCompletedTransition())
      

      10.
      JobAbortCompletedTransition#transition will call JobImpl#unsuccessfulFinish which will send JobUnsuccessfulCompletionEvent with finish time.

          public void transition(JobImpl job, JobEvent event) {
            JobStateInternal finalState = JobStateInternal.valueOf(
                ((JobAbortCompletedEvent) event).getFinalState().name());
            job.unsuccessfulFinish(finalState);
          }
        private void unsuccessfulFinish(JobStateInternal finalState) {
            if (finishTime == 0) setFinishTime();
            cleanupProgress = 1.0f;
            JobUnsuccessfulCompletionEvent unsuccessfulJobEvent =
                new JobUnsuccessfulCompletionEvent(oldJobId,
                    finishTime,
                    succeededMapTaskCount,
                    succeededReduceTaskCount,
                    finalState.toString(),
                    diagnostics);
            eventHandler.handle(new JobHistoryEvent(jobId,
                unsuccessfulJobEvent));
            finished(finalState);
        }
      

      11.
      JobUnsuccessfulCompletionEvent will be handled by JobHistoryEventHandler#handleEvent with type EventType.JOB_FAILED
      Based on the following code, you can see the JobIndexInfo#finishTime is set correctly but JobIndexInfo#submitTime and JobIndexInfo#jobStartTime are still -1.

            if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED
                || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) {
              try {
                JobUnsuccessfulCompletionEvent jucEvent = 
                    (JobUnsuccessfulCompletionEvent) event
                    .getHistoryEvent();
                mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
                mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
                mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
                mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
                closeEventWriter(event.getJobID());
                processDoneFiles(event.getJobID());
              } catch (IOException e) {
                throw new YarnRuntimeException(e);
              }
            }
      

      The error job history file name in our log is "job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist"
      Based on the filename, you can see submitTime is -1, finishTime is 1423572836007 and jobStartTime is 1423572836007.
      The jobStartTime is not -1, and jobStartTime is the same as finishTime.
      It is because jobStartTime is handled specially in FileNameIndexUtils#getDoneFileName:

          //JobStartTime
          if (indexInfo.getJobStartTime() >= 0) {
            sb.append(indexInfo.getJobStartTime());
          } else {
            sb.append(indexInfo.getFinishTime());
          }
      

      Attachments

        1. MAPREDUCE-6259.000.patch
          13 kB
          Zhihai Xu

        Activity

          People

            zxu Zhihai Xu
            zxu Zhihai Xu
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: