From 4fcba7d7ef859e9e38d6cd9559c3702f25f3a38b Mon Sep 17 00:00:00 2001 From: "qianhao.zhou" Date: Wed, 26 Nov 2014 20:01:40 +0800 Subject: [PATCH 1/2] refactor to be compatible to old segment --- .../main/java/com/kylinolap/job/JobInstanceBuilder.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java b/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java index 95bd445..bc6a0e4 100644 --- a/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java +++ b/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java @@ -20,6 +20,7 @@ import com.google.common.collect.Lists; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.mapred.jobcontrol.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,8 +154,7 @@ private String getRowkeyDistributionOutputPath() { String[] cuboidPaths = new String[mergingSegments.size()]; for (int i = 0; i < mergingSegments.size(); i++) { - CubeSegment seg = mergingSegments.get(i); - cuboidPaths[i] = JobInstance.getJobWorkingDir(seg.getUuid(), engineConfig.getHdfsWorkingDirectory()) + "/" + jobInstance.getRelatedCube() + "/cuboid/*"; + cuboidPaths[i] = getPathToMerge(jobInstance, mergingSegments.get(i)); } String formattedPath = formatPaths(cuboidPaths); @@ -228,8 +228,7 @@ private String getRowkeyDistributionOutputPath() { if (incBuildMerge) { List pathToMerge = Lists.newArrayList(); for (CubeSegment segment: cube.getSegments(CubeSegmentStatusEnum.READY)) { - String path = JobInstance.getJobWorkingDir(segment.getUuid(), engineConfig.getHdfsWorkingDirectory()) + "/" + jobInstance.getRelatedCube() + "/cuboid/*"; - pathToMerge.add(path); + pathToMerge.add(getPathToMerge(jobInstance, segment)); } pathToMerge.add(cuboidTmpRootPath + "*"); result.add(createMergeCuboidDataStep(jobInstance, stepSeqNum++, formatPaths(pathToMerge), cuboidRootPath)); @@ -254,6 +253,14 @@ private String getRowkeyDistributionOutputPath() { return result; } + private String getPathToMerge(JobInstance jobInstance, CubeSegment segment) { + String uuid = segment.getUuid(); + if (uuid == null) { + uuid = segment.getLastBuildJobID(); + } + return JobInstance.getJobWorkingDir(uuid, engineConfig.getHdfsWorkingDirectory()) + "/" + jobInstance.getRelatedCube() + "/cuboid/*"; + } + private String formatPaths(String[] paths) { return StringUtils.join(paths, ","); } From 5528f15ac0b48de289f2cbfdd37c182794e38ad4 Mon Sep 17 00:00:00 2001 From: "qianhao.zhou" Date: Wed, 26 Nov 2014 20:15:48 +0800 Subject: [PATCH 2/2] refactor --- job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java | 1 + job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java | 4 +++- server/src/main/java/com/kylinolap/rest/service/JobService.java | 2 ++ 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java b/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java index eea0d0b..4db57b8 100644 --- a/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java +++ b/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java @@ -294,6 +294,7 @@ private void updateCubeSegmentInfoOnSucceed(JobInstance jobInstance, JobEngineCo } cubeMgr.updateSegmentOnJobSucceed(cubeInstance, jobInstance.getType(), jobInstance.getRelatedSegment(), jobInstance.getUuid(), jobInstance.getExecEndTime(), cubeSize, sourceCount, sourceSize); + log.info("Update cube segment succeed" + jobInstance.getRelatedSegment() + " for cube " + jobInstance.getRelatedCube()); } } diff --git a/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java b/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java index bb4038d..4af5932 100644 --- a/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java +++ b/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java @@ -241,9 +241,11 @@ protected void waitCubeBuilt(List jobs) throws Exception { String uuid = seg.getUuid(); jobUuids.add(uuid); jobs.add(jobManager.createJob(cubename, seg.getName(), uuid, jobType)); - // submit job to store + seg.setLastBuildJobID(uuid); } + cubeMgr.updateCube(cube); for (JobInstance job: jobs) { + // submit job to store jobManager.submitJob(job); } return jobUuids; diff --git a/server/src/main/java/com/kylinolap/rest/service/JobService.java b/server/src/main/java/com/kylinolap/rest/service/JobService.java index dc1252b..125d732 100644 --- a/server/src/main/java/com/kylinolap/rest/service/JobService.java +++ b/server/src/main/java/com/kylinolap/rest/service/JobService.java @@ -112,8 +112,10 @@ public String submitJob(CubeInstance cube, long startDate, long endDate, CubeBui for (CubeSegment segment : cubeSegments) { uuid = segment.getUuid(); JobInstance job = this.getJobManager().createJob(cube.getName(), segment.getName(), segment.getUuid(), buildType); + segment.setLastBuildJobID(uuid); jobs.add(job); } + getCubeManager().updateCube(cube); for (JobInstance job: jobs) { this.getJobManager().submitJob(job); permissionService.init(job, null);