From e8db0faa8de247e53f2f1b957c963c3021dc5379 Mon Sep 17 00:00:00 2001 From: "qianhao.zhou" Date: Thu, 27 Nov 2014 15:42:58 +0800 Subject: [PATCH 1/2] fix CI issue --- .../main/java/com/kylinolap/cube/CubeInstance.java | 32 ++++++++++++++++++++-- .../main/java/com/kylinolap/cube/CubeManager.java | 10 +++---- .../com/kylinolap/cube/CubeSegmentValidator.java | 2 +- .../java/com/kylinolap/job/JobInstanceBuilder.java | 2 +- .../java/com/kylinolap/job/JoinedFlatTable.java | 4 +-- .../com/kylinolap/job/flow/JobFlowListener.java | 4 +-- .../com/kylinolap/job/BuildCubeWithEngineTest.java | 2 -- .../com/kylinolap/rest/service/JobService.java | 2 -- 8 files changed, 40 insertions(+), 18 deletions(-) diff --git a/cube/src/main/java/com/kylinolap/cube/CubeInstance.java b/cube/src/main/java/com/kylinolap/cube/CubeInstance.java index b75a8a5..e46b668 100644 --- a/cube/src/main/java/com/kylinolap/cube/CubeInstance.java +++ b/cube/src/main/java/com/kylinolap/cube/CubeInstance.java @@ -383,8 +383,36 @@ public void setCreateTime(String createTime) { return new long[]{start, end}; } - public boolean incrementalBuildOnHll() { - return (!getSegment(CubeSegmentStatusEnum.READY).isEmpty()) && getDescriptor().hasHolisticCountDistinctMeasures(); + + public boolean needMergeImmediately(CubeSegment segment) { + if (segment == null) { + return false; + } + if (!(segment.getStatus() == CubeSegmentStatusEnum.NEW)) { + return false; + } + return needMergeImmediately(segment.getDateRangeStart(), segment.getDateRangeEnd()); + } + + public boolean needMergeImmediately(long newSegmentRangeStart, long newSegmentRangeEnd) { + if (!getDescriptor().hasHolisticCountDistinctMeasures()) { + return false; + } + List readySegments = getSegments(CubeSegmentStatusEnum.READY); + if (readySegments.isEmpty()) { + return false; + } + for (CubeSegment readySegment : readySegments) { + if (readySegment.getDateRangeStart() == newSegmentRangeStart + && readySegment.getDateRangeEnd() == newSegmentRangeEnd) { + //refresh + return false; + } + } + if (getDateRange()[1] == newSegmentRangeStart) { + return true; + } + return false; } } diff --git a/cube/src/main/java/com/kylinolap/cube/CubeManager.java b/cube/src/main/java/com/kylinolap/cube/CubeManager.java index 557180a..0780031 100644 --- a/cube/src/main/java/com/kylinolap/cube/CubeManager.java +++ b/cube/src/main/java/com/kylinolap/cube/CubeManager.java @@ -270,10 +270,10 @@ public CubeInstance updateCube(CubeInstance cube) throws IOException { } List segments = new ArrayList(); + boolean needMergeImmediately = cubeInstance.needMergeImmediately(startDate, endDate); if (null != cubeInstance.getDescriptor().getCubePartitionDesc().getPartitionDateColumn()) { - if (cubeInstance.incrementalBuildOnHll()) { - long[] dateRange = cubeInstance.getDateRange(); - segments.add(buildSegment(cubeInstance, dateRange[0], endDate)); + if (needMergeImmediately) { + segments.add(buildSegment(cubeInstance, startDate, endDate)); } else { if (startDate == 0 && cubeInstance.getSegments().size() == 0) { @@ -306,7 +306,7 @@ public CubeInstance updateCube(CubeInstance cube) throws IOException { validateNewSegments(cubeInstance, buildType, segments); - if (buildType == CubeBuildTypeEnum.MERGE || cubeInstance.incrementalBuildOnHll()) { + if (buildType == CubeBuildTypeEnum.MERGE || needMergeImmediately) { this.makeDictForNewSegment(cubeInstance, segments.get(0)); this.makeSnapshotForNewSegment(cubeInstance, segments.get(0)); } @@ -339,7 +339,7 @@ public void updateSegmentOnJobSucceed(CubeInstance cubeInstance, CubeBuildTypeEn switch (buildType) { case BUILD: - if (cubeInstance.incrementalBuildOnHll()) { + if (cubeInstance.needMergeImmediately(cubeInstance.getSegmentById(lastBuildJobUuid))) { cubeInstance.getSegments().removeAll(cubeInstance.getMergingSegments()); } else { if (segmentsInNewStatus.size() == 1) {// if this the last segment in diff --git a/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java b/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java index 5a3c6d8..5c95c57 100644 --- a/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java +++ b/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java @@ -154,7 +154,7 @@ void validate(CubeInstance cubeInstance, List newSegments) throws C if (newSegments.size() != 1) { throw new CubeIntegrityException("Invalid date range."); } - if (cubeInstance.incrementalBuildOnHll()) { + if (cubeInstance.needMergeImmediately(newSegments.get(0))) { } else { CubeSegment newSegment = newSegments.get(0); diff --git a/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java b/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java index bc6a0e4..6c0e745 100644 --- a/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java +++ b/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java @@ -213,7 +213,7 @@ private String getRowkeyDistributionOutputPath() { final String cuboidRootPath = jobWorkingDir + "/" + cubeName + "/cuboid/"; final String cuboidTmpRootPath = jobWorkingDir + "/" + cubeName + "/tmp_cuboid/"; - final boolean incBuildMerge = cube.incrementalBuildOnHll(); + final boolean incBuildMerge = cube.needMergeImmediately(cube.getSegmentById(jobInstance.getUuid())); String[] cuboidOutputTempPath = getCuboidOutputPaths(incBuildMerge?cuboidTmpRootPath:cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount); // base cuboid step diff --git a/job/src/main/java/com/kylinolap/job/JoinedFlatTable.java b/job/src/main/java/com/kylinolap/job/JoinedFlatTable.java index 053dc5b..996f99e 100644 --- a/job/src/main/java/com/kylinolap/job/JoinedFlatTable.java +++ b/job/src/main/java/com/kylinolap/job/JoinedFlatTable.java @@ -27,6 +27,7 @@ import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; +import com.kylinolap.cube.CubeInstance; import org.w3c.dom.Document; import org.w3c.dom.NodeList; import org.xml.sax.SAXException; @@ -187,9 +188,6 @@ private static void appendWhereStatement(JoinedFlatTableDesc intermediateTableDe long dateStart = cubeSegment.getDateRangeStart(); long dateEnd = cubeSegment.getDateRangeEnd(); - if (cubeSegment.getCubeInstance().incrementalBuildOnHll()) { - dateStart = cubeSegment.getCubeInstance().getDateRange()[1]; - } if (!(dateStart == 0 && dateEnd == 0)) { String partitionColumnName = cubeDesc.getCubePartitionDesc().getPartitionDateColumn(); diff --git a/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java b/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java index 4db57b8..600b98b 100644 --- a/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java +++ b/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java @@ -277,8 +277,8 @@ private void updateCubeSegmentInfoOnSucceed(JobInstance jobInstance, JobEngineCo } else { log.info("No step with name '" + JobConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE + "' is found"); } - - if (cubeInstance.incrementalBuildOnHll()) { + CubeSegment segmentById = cubeInstance.getSegmentById(jobInstance.getUuid()); + if (cubeInstance.needMergeImmediately(segmentById)) { for (CubeSegment seg : cubeInstance.getMergingSegments()) { sourceCount += seg.getSourceRecords(); sourceSize += seg.getSourceRecordsSize(); diff --git a/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java b/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java index 4af5932..584162e 100644 --- a/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java +++ b/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java @@ -241,9 +241,7 @@ protected void waitCubeBuilt(List jobs) throws Exception { String uuid = seg.getUuid(); jobUuids.add(uuid); jobs.add(jobManager.createJob(cubename, seg.getName(), uuid, jobType)); - seg.setLastBuildJobID(uuid); } - cubeMgr.updateCube(cube); for (JobInstance job: jobs) { // submit job to store jobManager.submitJob(job); diff --git a/server/src/main/java/com/kylinolap/rest/service/JobService.java b/server/src/main/java/com/kylinolap/rest/service/JobService.java index 125d732..dc1252b 100644 --- a/server/src/main/java/com/kylinolap/rest/service/JobService.java +++ b/server/src/main/java/com/kylinolap/rest/service/JobService.java @@ -112,10 +112,8 @@ public String submitJob(CubeInstance cube, long startDate, long endDate, CubeBui for (CubeSegment segment : cubeSegments) { uuid = segment.getUuid(); JobInstance job = this.getJobManager().createJob(cube.getName(), segment.getName(), segment.getUuid(), buildType); - segment.setLastBuildJobID(uuid); jobs.add(job); } - getCubeManager().updateCube(cube); for (JobInstance job: jobs) { this.getJobManager().submitJob(job); permissionService.init(job, null); From 5c84eee0a4bab7f5b297bb070cf12d7228bd546b Mon Sep 17 00:00:00 2001 From: "qianhao.zhou" Date: Thu, 27 Nov 2014 16:34:20 +0800 Subject: [PATCH 2/2] refactor --- .../main/java/com/kylinolap/cube/CubeInstance.java | 4 ++++ .../com/kylinolap/cube/CubeSegmentValidator.java | 25 +++++++++++----------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/cube/src/main/java/com/kylinolap/cube/CubeInstance.java b/cube/src/main/java/com/kylinolap/cube/CubeInstance.java index e46b668..b9bd6be 100644 --- a/cube/src/main/java/com/kylinolap/cube/CubeInstance.java +++ b/cube/src/main/java/com/kylinolap/cube/CubeInstance.java @@ -31,6 +31,7 @@ import com.kylinolap.common.persistence.RootPersistentEntity; import com.kylinolap.metadata.MetadataManager; import com.kylinolap.metadata.model.cube.CubeDesc; +import com.kylinolap.metadata.model.cube.CubePartitionDesc; import com.kylinolap.metadata.model.invertedindex.InvertedIndexDesc; @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) @@ -395,6 +396,9 @@ public boolean needMergeImmediately(CubeSegment segment) { } public boolean needMergeImmediately(long newSegmentRangeStart, long newSegmentRangeEnd) { + if (this.getDescriptor().getCubePartitionDesc().getCubePartitionType() != CubePartitionDesc.CubePartitionType.APPEND) { + return false; + } if (!getDescriptor().hasHolisticCountDistinctMeasures()) { return false; } diff --git a/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java b/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java index 5c95c57..76fa238 100644 --- a/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java +++ b/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java @@ -55,15 +55,8 @@ void validate(CubeInstance cubeInstance, List newSegments) throws C } public static class MergeOperationValidator extends CubeSegmentValidator { - private void checkContingency(CubeInstance cubeInstance, List newSegments) throws CubeIntegrityException { - if (cubeInstance.getSegments().size() < 2) { - throw new CubeIntegrityException("No segments to merge."); - } - if (newSegments.size() != 1) { - throw new CubeIntegrityException("Invalid date range."); - } + private void checkContingency(CubeInstance cubeInstance, CubeSegment newSegment) throws CubeIntegrityException { - CubeSegment newSegment = newSegments.get(0); CubeSegment startSeg = null; CubeSegment endSeg = null; for (CubeSegment segment : cubeInstance.getSegments()) { @@ -80,9 +73,8 @@ private void checkContingency(CubeInstance cubeInstance, List newSe } } - private void checkLoopTableConsistency(CubeInstance cube, List newSegments) throws CubeIntegrityException { - - CubeSegment cubeSeg = newSegments.get(0); + private void checkLoopTableConsistency(CubeInstance cube, CubeSegment newSegment) throws CubeIntegrityException { + CubeSegment cubeSeg = newSegment; DictionaryManager dictMgr = DictionaryManager.getInstance(cube.getConfig()); List segmentList = cube.getMergingSegments(cubeSeg); @@ -136,8 +128,15 @@ private void checkLoopTableConsistency(CubeInstance cube, List newS @Override public void validate(CubeInstance cubeInstance, List newSegments) throws CubeIntegrityException { - this.checkContingency(cubeInstance, newSegments); - this.checkLoopTableConsistency(cubeInstance, newSegments); + if (cubeInstance.getSegments().size() < 2) { + throw new CubeIntegrityException("No segments to merge."); + } + if (newSegments.size() != 1) { + throw new CubeIntegrityException("Invalid date range."); + } + CubeSegment newSegment = newSegments.get(0); + this.checkContingency(cubeInstance, newSegment); + this.checkLoopTableConsistency(cubeInstance, newSegment); } }