From 9af955c13f2afd527387e778fe72fd4d172b61aa Mon Sep 17 00:00:00 2001 From: "qianhao.zhou" Date: Wed, 26 Nov 2014 19:13:23 +0800 Subject: [PATCH] 1. add uuid for CubeSegment 2. use the same uuid for CubeSegment and JobInstance, so that it is easy to mapping between CubeSegment and JobInstance --- cube/src/main/java/com/kylinolap/cube/CubeInstance.java | 16 +++++++++++++--- cube/src/main/java/com/kylinolap/cube/CubeManager.java | 8 ++++---- cube/src/main/java/com/kylinolap/cube/CubeSegment.java | 10 ++++++++++ .../main/java/com/kylinolap/job/JobInstanceBuilder.java | 4 ++-- job/src/main/java/com/kylinolap/job/JobManager.java | 9 ++++----- .../com/kylinolap/job/hadoop/cube/MergeCuboidMapper.java | 7 ++++--- .../com/kylinolap/job/hadoop/cube/StorageCleanupJob.java | 2 +- .../java/com/kylinolap/job/BuildCubeWithEngineTest.java | 3 --- .../main/java/com/kylinolap/rest/service/JobService.java | 4 +--- 9 files changed, 39 insertions(+), 24 deletions(-) diff --git a/cube/src/main/java/com/kylinolap/cube/CubeInstance.java b/cube/src/main/java/com/kylinolap/cube/CubeInstance.java index b2d7886..a6ef83f 100644 --- a/cube/src/main/java/com/kylinolap/cube/CubeInstance.java +++ b/cube/src/main/java/com/kylinolap/cube/CubeInstance.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonManagedReference; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Objects; import com.google.common.collect.Lists; import com.kylinolap.common.KylinConfig; import com.kylinolap.common.persistence.ResourceStore; @@ -314,7 +315,7 @@ public CubeSegment getLatestReadySegment() { public List getSegments(CubeSegmentStatusEnum status) { List segments = new ArrayList(); - for (CubeSegment segment : this.getSegments()) { + for (CubeSegment segment : segments) { if (segment.getStatus() == status) { segments.add(segment); } @@ -325,7 +326,7 @@ public CubeSegment getLatestReadySegment() { public List getSegment(CubeSegmentStatusEnum status) { List result = Lists.newArrayList(); - for (CubeSegment segment: getSegments()) { + for (CubeSegment segment: segments) { if (segment.getStatus() == status) { result.add(segment); } @@ -334,7 +335,7 @@ public CubeSegment getLatestReadySegment() { } public CubeSegment getSegment(String name, CubeSegmentStatusEnum status) { - for (CubeSegment segment : this.getSegments()) { + for (CubeSegment segment : segments) { if ((null != segment.getName() && segment.getName().equals(name)) && segment.getStatus() == status) { return segment; } @@ -347,6 +348,15 @@ public void setSegments(List segments) { this.segments = segments; } + public CubeSegment getSegmentById(String segmentId) { + for (CubeSegment segment: segments) { + if (Objects.equal(segment.getUuid(), segmentId)) { + return segment; + } + } + return null; + } + public String getCreateTime() { return createTime; } diff --git a/cube/src/main/java/com/kylinolap/cube/CubeManager.java b/cube/src/main/java/com/kylinolap/cube/CubeManager.java index 8add9c5..557180a 100644 --- a/cube/src/main/java/com/kylinolap/cube/CubeManager.java +++ b/cube/src/main/java/com/kylinolap/cube/CubeManager.java @@ -191,9 +191,9 @@ public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, String info = dictMgr.getDictionaryInfo(dictResPath); if (info == null) - throw new IllegalStateException("No dictionary found by " + dictResPath + ", invalid cube state; cube segment" + cubeSeg.getName() + ", col " + col); + throw new IllegalStateException("No dictionary found by " + dictResPath + ", invalid cube state; cube segment" + cubeSeg + ", col " + col); } catch (IOException e) { - throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg.getName() + ", col" + col, e); + throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg + ", col" + col, e); } return info.getDictionaryObject(); @@ -418,7 +418,7 @@ public LookupStringTable getLookupTable(CubeSegment cubeSegment, DimensionDesc d if (r == null) { String snapshotResPath = cubeSegment.getSnapshotResPath(tableName); if (snapshotResPath == null) - throw new IllegalStateException("No snaphot for table '" + tableName + "' found on cube segment" + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment.getName()); + throw new IllegalStateException("No snaphot for table '" + tableName + "' found on cube segment" + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment); try { SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath); @@ -468,7 +468,7 @@ private void makeDictForNewSegment(CubeInstance cube, CubeSegment newSeg) throws logger.info("Merging fact table dictionary on : " + col); List dictInfos = new ArrayList(); for (CubeSegment segment : mergingSegments) { - logger.info("Including fact table dictionary of segment : " + segment.getName()); + logger.info("Including fact table dictionary of segment : " + segment); DictionaryInfo dictInfo = dictMgr.getDictionaryInfo(segment.getDictResPath(col)); dictInfos.add(dictInfo); } diff --git a/cube/src/main/java/com/kylinolap/cube/CubeSegment.java b/cube/src/main/java/com/kylinolap/cube/CubeSegment.java index aa841d8..a312297 100644 --- a/cube/src/main/java/com/kylinolap/cube/CubeSegment.java +++ b/cube/src/main/java/com/kylinolap/cube/CubeSegment.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.JsonBackReference; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Objects; import com.kylinolap.metadata.model.cube.CubeDesc; import com.kylinolap.metadata.model.cube.TblColRef; @@ -297,4 +298,13 @@ public boolean equals(Object obj) { return true; } + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("uuid", uuid) + .add("create_time:", createTime) + .add("name", name) + .add("last_build_job_id", lastBuildJobID) + .toString(); + } } diff --git a/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java b/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java index 7f5a6a8..95bd445 100644 --- a/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java +++ b/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java @@ -154,7 +154,7 @@ private String getRowkeyDistributionOutputPath() { String[] cuboidPaths = new String[mergingSegments.size()]; for (int i = 0; i < mergingSegments.size(); i++) { CubeSegment seg = mergingSegments.get(i); - cuboidPaths[i] = JobInstance.getJobWorkingDir(seg.getLastBuildJobID(), engineConfig.getHdfsWorkingDirectory()) + "/" + jobInstance.getRelatedCube() + "/cuboid/*"; + cuboidPaths[i] = JobInstance.getJobWorkingDir(seg.getUuid(), engineConfig.getHdfsWorkingDirectory()) + "/" + jobInstance.getRelatedCube() + "/cuboid/*"; } String formattedPath = formatPaths(cuboidPaths); @@ -228,7 +228,7 @@ private String getRowkeyDistributionOutputPath() { if (incBuildMerge) { List pathToMerge = Lists.newArrayList(); for (CubeSegment segment: cube.getSegments(CubeSegmentStatusEnum.READY)) { - String path = JobInstance.getJobWorkingDir(segment.getLastBuildJobID(), engineConfig.getHdfsWorkingDirectory()) + "/" + jobInstance.getRelatedCube() + "/cuboid/*"; + String path = JobInstance.getJobWorkingDir(segment.getUuid(), engineConfig.getHdfsWorkingDirectory()) + "/" + jobInstance.getRelatedCube() + "/cuboid/*"; pathToMerge.add(path); } pathToMerge.add(cuboidTmpRootPath + "*"); diff --git a/job/src/main/java/com/kylinolap/job/JobManager.java b/job/src/main/java/com/kylinolap/job/JobManager.java index a95d94c..00aa8bb 100644 --- a/job/src/main/java/com/kylinolap/job/JobManager.java +++ b/job/src/main/java/com/kylinolap/job/JobManager.java @@ -23,7 +23,6 @@ import java.util.Date; import java.util.List; import java.util.TimeZone; -import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,9 +71,9 @@ public JobManager(String engineID, JobEngineConfig engineCfg) throws JobExceptio this.jobEngine = JobEngine.getInstance(engineID, engineCfg); } - public JobInstance createJob(String cubeName, String segmentName, String uuid, CubeBuildTypeEnum jobType) throws IOException { + public JobInstance createJob(String cubeName, String segmentName, String segmentId, CubeBuildTypeEnum jobType) throws IOException { // build job instance - JobInstance jobInstance = buildJobInstance(cubeName, segmentName, uuid, jobType); + JobInstance jobInstance = buildJobInstance(cubeName, segmentName, segmentId, jobType); // create job steps based on job type JobInstanceBuilder stepBuilder = new JobInstanceBuilder(this.engineConfig); @@ -83,12 +82,12 @@ public JobInstance createJob(String cubeName, String segmentName, String uuid, C return jobInstance; } - private JobInstance buildJobInstance(String cubeName, String segmentName, String uuid, CubeBuildTypeEnum jobType) { + private JobInstance buildJobInstance(String cubeName, String segmentName, String segmentId, CubeBuildTypeEnum jobType) { SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss"); format.setTimeZone(TimeZone.getTimeZone(this.engineConfig.getTimeZone())); JobInstance jobInstance = new JobInstance(); - jobInstance.setUuid(uuid); + jobInstance.setUuid(segmentId); jobInstance.setType(jobType); jobInstance.setRelatedCube(cubeName); jobInstance.setRelatedSegment(segmentName); diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidMapper.java index ac905a5..627c397 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidMapper.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidMapper.java @@ -88,10 +88,11 @@ private String extractJobIDFromPath(String path) { } } - private CubeSegment findSegmentWithJobID(String jobID, CubeInstance cubeInstance) { + private CubeSegment findSegmentWithUuid(String jobID, CubeInstance cubeInstance) { for (CubeSegment segment : cubeInstance.getSegments()) { - if (segment.getLastBuildJobID().equalsIgnoreCase(jobID)) + if (segment.getUuid().equalsIgnoreCase(jobID)) { return segment; + } } throw new IllegalStateException("No merging segment's last build job ID equals " + jobID); @@ -117,7 +118,7 @@ protected void setup(Context context) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.InputSplit inputSplit = context.getInputSplit(); String filePath = ((FileSplit) inputSplit).getPath().toString(); String jobID = extractJobIDFromPath(filePath); - sourceCubeSegment = findSegmentWithJobID(jobID, cube); + sourceCubeSegment = findSegmentWithUuid(jobID, cube); this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255); } diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/StorageCleanupJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/StorageCleanupJob.java index 3a0b988..51d893c 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cube/StorageCleanupJob.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/StorageCleanupJob.java @@ -179,7 +179,7 @@ private void cleanUnusedHdfsFiles(Configuration conf) throws IOException { if (jobUuid != null && jobUuid.equals("") == false) { String path = JobInstance.getJobWorkingDir(jobUuid, engineConfig.getHdfsWorkingDirectory()); allHdfsPathsNeedToBeDeleted.remove(path); - log.info("Remove " + path + " from deletion list, as the path belongs to segment " + seg.getName() + " of cube " + cube.getName()); + log.info("Remove " + path + " from deletion list, as the path belongs to segment " + seg + " of cube " + cube.getName()); } } } diff --git a/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java b/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java index 5f031f2..bb4038d 100644 --- a/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java +++ b/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java @@ -239,13 +239,10 @@ protected void waitCubeBuilt(List jobs) throws Exception { List jobs = Lists.newArrayList(); for (CubeSegment seg : newSegments) { String uuid = seg.getUuid(); - seg.setLastBuildJobID(uuid); - jobUuids.add(uuid); jobs.add(jobManager.createJob(cubename, seg.getName(), uuid, jobType)); // submit job to store } - cubeMgr.updateCube(cube); for (JobInstance job: jobs) { jobManager.submitJob(job); } diff --git a/server/src/main/java/com/kylinolap/rest/service/JobService.java b/server/src/main/java/com/kylinolap/rest/service/JobService.java index a0ba371..dc1252b 100644 --- a/server/src/main/java/com/kylinolap/rest/service/JobService.java +++ b/server/src/main/java/com/kylinolap/rest/service/JobService.java @@ -111,11 +111,9 @@ public String submitJob(CubeInstance cube, long startDate, long endDate, CubeBui List jobs = Lists.newArrayListWithExpectedSize(cubeSegments.size()); for (CubeSegment segment : cubeSegments) { uuid = segment.getUuid(); - JobInstance job = this.getJobManager().createJob(cube.getName(), segment.getName(), uuid, buildType); + JobInstance job = this.getJobManager().createJob(cube.getName(), segment.getName(), segment.getUuid(), buildType); jobs.add(job); - segment.setLastBuildJobID(uuid); } - getCubeManager().updateCube(cube); for (JobInstance job: jobs) { this.getJobManager().submitJob(job); permissionService.init(job, null);