From efbcb4851477200e6e2e1130f2c9491142b33ec1 Mon Sep 17 00:00:00 2001 From: "qianhao.zhou" Date: Thu, 8 Jan 2015 11:04:20 +0800 Subject: [PATCH 1/3] fix bug --- job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidMapper.java | 2 +- .../com/kylinolap/job2/cube/UpdateCubeInfoAfterMergeExecutable.java | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidMapper.java index b89aca5..f320236 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidMapper.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidMapper.java @@ -94,7 +94,7 @@ private String extractJobIDFromPath(String path) { private CubeSegment findSegmentWithUuid(String jobID, CubeInstance cubeInstance) { for (CubeSegment segment : cubeInstance.getSegments()) { - if (segment.getUuid().equalsIgnoreCase(jobID)) { + if (segment.getLastBuildJobID().equalsIgnoreCase(jobID)) { return segment; } } diff --git a/job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoAfterMergeExecutable.java b/job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoAfterMergeExecutable.java index c635de3..4e476f1 100644 --- a/job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoAfterMergeExecutable.java +++ b/job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoAfterMergeExecutable.java @@ -12,6 +12,7 @@ import com.kylinolap.job2.execution.ExecutableContext; import com.kylinolap.job2.execution.ExecuteResult; import com.kylinolap.job2.impl.threadpool.AbstractExecutable; +import com.kylinolap.metadata.model.SegmentStatusEnum; import org.apache.commons.lang.StringUtils; import java.io.IOException; @@ -72,6 +73,8 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio mergedSegment.setSourceRecords(sourceCount); mergedSegment.setSourceRecordsSize(sourceSize); mergedSegment.setLastBuildJobID(getCubingJobId()); + mergedSegment.setStatus(SegmentStatusEnum.READY); + mergedSegment.setLastBuildTime(System.currentTimeMillis()); //remove old segment cube.getSegments().removeAll(toBeRemoved); try { From 1e52d5195909e0bfabaa53d3e78e3b3bda74b03f Mon Sep 17 00:00:00 2001 From: "qianhao.zhou" Date: Thu, 8 Jan 2015 13:50:13 +0800 Subject: [PATCH 2/3] resume interrupted mr job --- .../kylinolap/job/hadoop/AbstractHadoopJob.java | 4 ++ .../kylinolap/job/tools/HadoopStatusChecker.java | 42 +++++++------- .../com/kylinolap/job2/common/HadoopCmdOutput.java | 64 ++++++++-------------- .../kylinolap/job2/common/MapReduceExecutable.java | 54 +++++++++++++----- .../job2/impl/threadpool/AbstractExecutable.java | 6 +- 5 files changed, 94 insertions(+), 76 deletions(-) diff --git a/job/src/main/java/com/kylinolap/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/com/kylinolap/job/hadoop/AbstractHadoopJob.java index d160770..37fb925 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/AbstractHadoopJob.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/AbstractHadoopJob.java @@ -325,4 +325,8 @@ public void setAsync(boolean isAsync) { this.isAsync = isAsync; } + public Job getJob() { + return this.job; + } + } diff --git a/job/src/main/java/com/kylinolap/job/tools/HadoopStatusChecker.java b/job/src/main/java/com/kylinolap/job/tools/HadoopStatusChecker.java index c11698a..d61fb30 100644 --- a/job/src/main/java/com/kylinolap/job/tools/HadoopStatusChecker.java +++ b/job/src/main/java/com/kylinolap/job/tools/HadoopStatusChecker.java @@ -129,29 +129,31 @@ private String getHttpResponse(String url) throws IOException { } HttpMethod get = new GetMethod(url); - client.executeMethod(get); - - String redirect = null; - Header h = get.getResponseHeader("Refresh"); - if (h != null) { - String s = h.getValue(); - int cut = s.indexOf("url="); - if (cut >= 0) { - redirect = s.substring(cut + 4); + try { + client.executeMethod(get); + + String redirect = null; + Header h = get.getResponseHeader("Refresh"); + if (h != null) { + String s = h.getValue(); + int cut = s.indexOf("url="); + if (cut >= 0) { + redirect = s.substring(cut + 4); + } } - } - if (redirect == null) { - response = get.getResponseBodyAsString(); - output.append("Job " + mrJobID + " get status check result.\n"); - log.debug("Job " + mrJobID + " get status check result.\n"); - } else { - url = redirect; - output.append("Job " + mrJobID + " check redirect url " + url + ".\n"); - log.debug("Job " + mrJobID + " check redirect url " + url + ".\n"); + if (redirect == null) { + response = get.getResponseBodyAsString(); + output.append("Job " + mrJobID + " get status check result.\n"); + log.debug("Job " + mrJobID + " get status check result.\n"); + } else { + url = redirect; + output.append("Job " + mrJobID + " check redirect url " + url + ".\n"); + log.debug("Job " + mrJobID + " check redirect url " + url + ".\n"); + } + } finally { + get.releaseConnection(); } - - get.releaseConnection(); } return response; diff --git a/job/src/main/java/com/kylinolap/job2/common/HadoopCmdOutput.java b/job/src/main/java/com/kylinolap/job2/common/HadoopCmdOutput.java index ba1d0d3..2be5f87 100644 --- a/job/src/main/java/com/kylinolap/job2/common/HadoopCmdOutput.java +++ b/job/src/main/java/com/kylinolap/job2/common/HadoopCmdOutput.java @@ -16,16 +16,22 @@ package com.kylinolap.job2.common; +import com.kylinolap.job.JobInstance; import com.kylinolap.job.constant.JobStepStatusEnum; import com.kylinolap.job.exception.JobException; import com.kylinolap.job.hadoop.AbstractHadoopJob; import com.kylinolap.job.tools.HadoopStatusChecker; import com.kylinolap.job2.constants.ExecutableConstants; import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.TaskCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + /** * @author xduo * @@ -34,56 +40,34 @@ protected static final Logger log = LoggerFactory.getLogger(HadoopCmdOutput.class); - private StringBuilder output; - private final String yarnUrl; - private final AbstractHadoopJob job; - private String mrJobID = null; - private String trackUrl = null; + private final StringBuilder output; + private final Job job; - public HadoopCmdOutput(String yarnUrl, AbstractHadoopJob job) { + public HadoopCmdOutput(Job job, StringBuilder output) { super(); - this.yarnUrl = yarnUrl; this.job = job; - this.output = new StringBuilder(); - } - - public JobStepStatusEnum getStatus() { - getTrackUrl(); - getMrJobId(); - final JobStepStatusEnum jobStepStatusEnum = new HadoopStatusChecker(this.yarnUrl, this.mrJobID, output).checkStatus(); - if (jobStepStatusEnum.isComplete()) { - updateJobCounter(); - } - return jobStepStatusEnum; - } - - public String getOutput() { - return output.toString(); + this.output = output; } public String getMrJobId() { - try { - if (mrJobID == null) { - mrJobID = job.getInfo().get(ExecutableConstants.MR_JOB_ID); - } - return mrJobID; - } catch (JobException e) { - throw new RuntimeException(e); - } + return getInfo().get(ExecutableConstants.MR_JOB_ID); } - public String getTrackUrl() { - try { - if (trackUrl == null) { - trackUrl = job.getInfo().get(ExecutableConstants.YARN_APP_URL); + public Map getInfo() { + if (job != null) { + Map status = new HashMap(); + if (null != job.getJobID()) { + status.put(ExecutableConstants.MR_JOB_ID, job.getJobID().toString()); + } + if (null != job.getTrackingURL()) { + status.put(ExecutableConstants.YARN_APP_URL, job.getTrackingURL().toString()); } - return trackUrl; - } catch (JobException e) { - throw new RuntimeException(e); + return status; + } else { + return Collections.emptyMap(); } } - private String mapInputRecords; private String hdfsBytesWritten; @@ -95,11 +79,11 @@ public String getHdfsBytesWritten() { return hdfsBytesWritten; } - private void updateJobCounter() { + public void updateJobCounter() { try { Counters counters = job.getCounters(); if (counters == null) { - String errorMsg = "no counters for job " + mrJobID; + String errorMsg = "no counters for job " + getMrJobId(); log.warn(errorMsg); output.append(errorMsg); return; diff --git a/job/src/main/java/com/kylinolap/job2/common/MapReduceExecutable.java b/job/src/main/java/com/kylinolap/job2/common/MapReduceExecutable.java index 4fd782e..9180879 100644 --- a/job/src/main/java/com/kylinolap/job2/common/MapReduceExecutable.java +++ b/job/src/main/java/com/kylinolap/job2/common/MapReduceExecutable.java @@ -1,8 +1,10 @@ package com.kylinolap.job2.common; import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import com.kylinolap.job.constant.JobStepStatusEnum; import com.kylinolap.job.hadoop.AbstractHadoopJob; +import com.kylinolap.job.tools.HadoopStatusChecker; import com.kylinolap.job2.constants.ExecutableConstants; import com.kylinolap.job2.dao.JobPO; import com.kylinolap.job2.exception.ExecuteException; @@ -10,6 +12,10 @@ import com.kylinolap.job2.execution.ExecutableState; import com.kylinolap.job2.execution.ExecuteResult; import com.kylinolap.job2.impl.threadpool.AbstractExecutable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.util.ToolRunner; import java.lang.reflect.Constructor; @@ -32,44 +38,66 @@ public MapReduceExecutable(JobPO job) { } @Override + protected void onExecuteStart(ExecutableContext executableContext) { + if (!jobService.getOutput(getId()).getExtra().containsKey(START_TIME)) { + Map info = Maps.newHashMap(); + info.put(START_TIME, Long.toString(System.currentTimeMillis())); + jobService.updateJobOutput(getId(), ExecutableState.RUNNING, info, null);; + } else { + jobService.updateJobOutput(getId(), ExecutableState.RUNNING, null, null); + } + } + + @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { final String mapReduceJobClass = getMapReduceJobClass(); String params = getMapReduceParams(); Preconditions.checkNotNull(mapReduceJobClass); Preconditions.checkNotNull(params); try { - final Constructor constructor = (Constructor) Class.forName(mapReduceJobClass).getConstructor(); - final AbstractHadoopJob job = constructor.newInstance(); - job.setAsync(true); - String[] args = params.trim().split("\\s+"); - ToolRunner.run(job, args); - - final HadoopCmdOutput hadoopCmdOutput = new HadoopCmdOutput(context.getConfig().getYarnStatusServiceUrl(), job); + Job job; + final Map extra = jobService.getOutput(getId()).getExtra(); + if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) { + logger.info("mr_job_id:" + extra.get(ExecutableConstants.MR_JOB_ID)); + job = new Cluster(new Configuration()).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID))); + } else { + final Constructor constructor = (Constructor) Class.forName(mapReduceJobClass).getConstructor(); + final AbstractHadoopJob hadoopJob = constructor.newInstance(); + hadoopJob.setAsync(true); + String[] args = params.trim().split("\\s+"); + ToolRunner.run(hadoopJob, args); + job = hadoopJob.getJob(); + } + final StringBuilder output = new StringBuilder(); + final HadoopCmdOutput hadoopCmdOutput = new HadoopCmdOutput(job, output); + String mrJobId = hadoopCmdOutput.getMrJobId(); + HadoopStatusChecker statusChecker = new HadoopStatusChecker(context.getConfig().getYarnStatusServiceUrl(), mrJobId, output); JobStepStatusEnum status = JobStepStatusEnum.NEW; do { - JobStepStatusEnum newStatus = hadoopCmdOutput.getStatus(); + JobStepStatusEnum newStatus = statusChecker.checkStatus(); if (status == JobStepStatusEnum.WAITING && (newStatus == JobStepStatusEnum.FINISHED || newStatus == JobStepStatusEnum.ERROR || newStatus == JobStepStatusEnum.RUNNING)) { final long waitTime = System.currentTimeMillis() - getStartTime(); addExtraInfo(MAP_REDUCE_WAIT_TIME, Long.toString(waitTime)); } status = newStatus; - jobService.addJobInfo(getId(), job.getInfo()); + jobService.addJobInfo(getId(), hadoopCmdOutput.getInfo()); if (status.isComplete()) { - final Map info = job.getInfo(); + hadoopCmdOutput.updateJobCounter(); + final Map info = hadoopCmdOutput.getInfo(); info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, hadoopCmdOutput.getMapInputRecords()); info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hadoopCmdOutput.getHdfsBytesWritten()); jobService.addJobInfo(getId(), info); if (status == JobStepStatusEnum.FINISHED) { - return new ExecuteResult(ExecuteResult.State.SUCCEED, hadoopCmdOutput.getOutput()); + return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString()); } else { - return new ExecuteResult(ExecuteResult.State.FAILED, hadoopCmdOutput.getOutput()); + return new ExecuteResult(ExecuteResult.State.FAILED, output.toString()); } } Thread.sleep(context.getConfig().getYarnStatusCheckIntervalSeconds() * 1000); } while (!isStopped()); - return new ExecuteResult(ExecuteResult.State.STOPPED, hadoopCmdOutput.getOutput()); + return new ExecuteResult(ExecuteResult.State.STOPPED, output.toString()); } catch (ReflectiveOperationException e) { logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e); diff --git a/job/src/main/java/com/kylinolap/job2/impl/threadpool/AbstractExecutable.java b/job/src/main/java/com/kylinolap/job2/impl/threadpool/AbstractExecutable.java index c26eb72..64ce046 100644 --- a/job/src/main/java/com/kylinolap/job2/impl/threadpool/AbstractExecutable.java +++ b/job/src/main/java/com/kylinolap/job2/impl/threadpool/AbstractExecutable.java @@ -20,9 +20,9 @@ */ public abstract class AbstractExecutable implements Executable, Idempotent { - private static final String SUBMITTER = "submitter"; - private static final String START_TIME = "startTime"; - private static final String END_TIME = "endTime"; + protected static final String SUBMITTER = "submitter"; + protected static final String START_TIME = "startTime"; + protected static final String END_TIME = "endTime"; private JobPO job; protected static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class); From 8e1328cf2c459678329dc1058090cc16994c6330 Mon Sep 17 00:00:00 2001 From: "qianhao.zhou" Date: Thu, 8 Jan 2015 14:54:55 +0800 Subject: [PATCH 3/3] add log for resuming mr job --- job/src/main/java/com/kylinolap/job2/common/MapReduceExecutable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/job/src/main/java/com/kylinolap/job2/common/MapReduceExecutable.java b/job/src/main/java/com/kylinolap/job2/common/MapReduceExecutable.java index 9180879..de799fe 100644 --- a/job/src/main/java/com/kylinolap/job2/common/MapReduceExecutable.java +++ b/job/src/main/java/com/kylinolap/job2/common/MapReduceExecutable.java @@ -58,8 +58,8 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio Job job; final Map extra = jobService.getOutput(getId()).getExtra(); if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) { - logger.info("mr_job_id:" + extra.get(ExecutableConstants.MR_JOB_ID)); job = new Cluster(new Configuration()).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID))); + logger.info("mr_job_id:" + extra.get(ExecutableConstants.MR_JOB_ID + " resumed")); } else { final Constructor constructor = (Constructor) Class.forName(mapReduceJobClass).getConstructor(); final AbstractHadoopJob hadoopJob = constructor.newInstance();