From 08b70648664999b663e8915b7e355fba911358a8 Mon Sep 17 00:00:00 2001 From: "qianhao.zhou" Date: Fri, 28 Nov 2014 09:51:35 +0800 Subject: [PATCH 1/2] Revert "fix CI issue" This reverts commit e8db0faa8de247e53f2f1b957c963c3021dc5379. --- .../main/java/com/kylinolap/cube/CubeInstance.java | 33 +-------------- .../main/java/com/kylinolap/cube/CubeManager.java | 47 +++++++++------------- .../com/kylinolap/cube/CubeSegmentValidator.java | 26 ++++++------ .../java/com/kylinolap/job/JobInstanceBuilder.java | 2 +- .../java/com/kylinolap/job/JoinedFlatTable.java | 4 +- .../com/kylinolap/job/flow/JobFlowListener.java | 13 +++--- .../com/kylinolap/job/BuildCubeWithEngineTest.java | 2 + .../com/kylinolap/rest/service/JobService.java | 2 + 8 files changed, 47 insertions(+), 82 deletions(-) diff --git a/cube/src/main/java/com/kylinolap/cube/CubeInstance.java b/cube/src/main/java/com/kylinolap/cube/CubeInstance.java index 18f28f4..b75a8a5 100644 --- a/cube/src/main/java/com/kylinolap/cube/CubeInstance.java +++ b/cube/src/main/java/com/kylinolap/cube/CubeInstance.java @@ -31,7 +31,6 @@ import com.kylinolap.common.persistence.RootPersistentEntity; import com.kylinolap.metadata.MetadataManager; import com.kylinolap.metadata.model.cube.CubeDesc; -import com.kylinolap.metadata.model.cube.CubePartitionDesc; import com.kylinolap.metadata.model.invertedindex.InvertedIndexDesc; @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) @@ -384,36 +383,8 @@ public void setCreateTime(String createTime) { return new long[]{start, end}; } - - public boolean needMergeImmediately(CubeSegment segment) { - if (segment == null) { - return false; - } - if (!(segment.getStatus() == CubeSegmentStatusEnum.NEW)) { - return false; - } - return needMergeImmediately(segment.getDateRangeStart(), segment.getDateRangeEnd()); - } - - public boolean needMergeImmediately(long newSegmentRangeStart, long newSegmentRangeEnd) { - if (this.getDescriptor().getCubePartitionDesc().getCubePartitionType() != CubePartitionDesc.CubePartitionType.APPEND) { - return false; - } - if (!getDescriptor().hasHolisticCountDistinctMeasures()) { - return false; - } - List readySegments = getSegments(CubeSegmentStatusEnum.READY); - if (readySegments.isEmpty()) { - return false; - } - for (CubeSegment readySegment : readySegments) { - if (readySegment.getDateRangeStart() == newSegmentRangeStart - && readySegment.getDateRangeEnd() == newSegmentRangeEnd) { - //refresh - return false; - } - } - return true; + public boolean incrementalBuildOnHll() { + return (!getSegment(CubeSegmentStatusEnum.READY).isEmpty()) && getDescriptor().hasHolisticCountDistinctMeasures(); } } diff --git a/cube/src/main/java/com/kylinolap/cube/CubeManager.java b/cube/src/main/java/com/kylinolap/cube/CubeManager.java index 8a4c2fb..557180a 100644 --- a/cube/src/main/java/com/kylinolap/cube/CubeManager.java +++ b/cube/src/main/java/com/kylinolap/cube/CubeManager.java @@ -20,7 +20,6 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import com.google.common.base.Preconditions; import com.kylinolap.dict.DateStrDictionary; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -271,10 +270,10 @@ public CubeInstance updateCube(CubeInstance cube) throws IOException { } List segments = new ArrayList(); - boolean needMergeImmediately = cubeInstance.needMergeImmediately(startDate, endDate); if (null != cubeInstance.getDescriptor().getCubePartitionDesc().getPartitionDateColumn()) { - if (needMergeImmediately) { - segments.add(buildSegment(cubeInstance, startDate, endDate)); + if (cubeInstance.incrementalBuildOnHll()) { + long[] dateRange = cubeInstance.getDateRange(); + segments.add(buildSegment(cubeInstance, dateRange[0], endDate)); } else { if (startDate == 0 && cubeInstance.getSegments().size() == 0) { @@ -307,16 +306,9 @@ public CubeInstance updateCube(CubeInstance cube) throws IOException { validateNewSegments(cubeInstance, buildType, segments); - if (buildType == CubeBuildTypeEnum.MERGE) { - CubeSegment newSeg = segments.get(0); - List mergingSegments = cubeInstance.getMergingSegments(newSeg); - this.makeDictForNewSegment(cubeInstance, newSeg, mergingSegments); - this.makeSnapshotForNewSegment(newSeg, mergingSegments); - } else if (needMergeImmediately) { - CubeSegment newSeg = segments.get(0); - List mergingSegments = cubeInstance.getSegment(CubeSegmentStatusEnum.READY); - this.makeDictForNewSegment(cubeInstance, newSeg, mergingSegments); - this.makeSnapshotForNewSegment(newSeg, mergingSegments); + if (buildType == CubeBuildTypeEnum.MERGE || cubeInstance.incrementalBuildOnHll()) { + this.makeDictForNewSegment(cubeInstance, segments.get(0)); + this.makeSnapshotForNewSegment(cubeInstance, segments.get(0)); } cubeInstance.getSegments().addAll(segments); @@ -340,20 +332,15 @@ public static String getHtableMetadataKey() { return "KYLIN_HOST"; } - public void updateSegmentOnJobSucceed(CubeInstance cubeInstance, CubeBuildTypeEnum buildType, String segmentName, String jobUuid, long lastBuildTime, long sizeKB, long sourceRecordCount, long sourceRecordsSize) throws IOException, CubeIntegrityException { + public void updateSegmentOnJobSucceed(CubeInstance cubeInstance, CubeBuildTypeEnum buildType, String segmentName, String lastBuildJobUuid, long lastBuildTime, long sizeKB, long sourceRecordCount, long sourceRecordsSize) throws IOException, CubeIntegrityException { List segmentsInNewStatus = cubeInstance.getSegments(CubeSegmentStatusEnum.NEW); - CubeSegment cubeSegment = cubeInstance.getSegmentById(jobUuid); - if (cubeSegment == null) { - cubeSegment = cubeInstance.getSegment(segmentName, CubeSegmentStatusEnum.NEW); - } - Preconditions.checkNotNull(cubeSegment); - Preconditions.checkArgument(cubeSegment.getStatus() == CubeSegmentStatusEnum.NEW, "invalid status of Segment:" + cubeSegment); + CubeSegment cubeSegment = cubeInstance.getSegment(segmentName, CubeSegmentStatusEnum.NEW); switch (buildType) { case BUILD: - if (cubeInstance.needMergeImmediately(cubeSegment)) { - cubeInstance.getSegments().removeAll(cubeInstance.getSegment(CubeSegmentStatusEnum.READY)); + if (cubeInstance.incrementalBuildOnHll()) { + cubeInstance.getSegments().removeAll(cubeInstance.getMergingSegments()); } else { if (segmentsInNewStatus.size() == 1) {// if this the last segment in // status of NEW @@ -367,7 +354,7 @@ public void updateSegmentOnJobSucceed(CubeInstance cubeInstance, CubeBuildTypeEn break; } - cubeSegment.setLastBuildJobID(jobUuid); + cubeSegment.setLastBuildJobID(lastBuildJobUuid); cubeSegment.setLastBuildTime(lastBuildTime); cubeSegment.setSizeKB(sizeKB); cubeSegment.setSourceRecords(sourceRecordCount); @@ -458,16 +445,17 @@ public LookupStringTable getLookupTable(CubeSegment cubeSegment, DimensionDesc d * @param newSeg * @throws IOException */ - private void makeDictForNewSegment(CubeInstance cube, CubeSegment newSeg, List mergingSegments) throws IOException { + private void makeDictForNewSegment(CubeInstance cube, CubeSegment newSeg) throws IOException { + List mergingSegments = cube.getMergingSegments(newSeg); + HashSet colsNeedMeringDict = new HashSet(); HashSet colsNeedCopyDict = new HashSet(); DictionaryManager dictMgr = this.getDictionaryManager(); - CubeDesc descriptor = cube.getDescriptor(); - for (DimensionDesc dim : descriptor.getDimensions()) { + for (DimensionDesc dim : cube.getDescriptor().getDimensions()) { for (TblColRef col : dim.getColumnRefs()) { if (newSeg.getCubeDesc().getRowkey().isUseDictionary(col)) { - if (descriptor.getFactTable().equalsIgnoreCase((String) dictMgr.decideSourceData(descriptor, col, null)[0])) { + if (cube.getDescriptor().getFactTable().equalsIgnoreCase((String) dictMgr.decideSourceData(cube.getDescriptor(), col, null)[0])) { colsNeedMeringDict.add(col); } else { colsNeedCopyDict.add(col); @@ -501,7 +489,8 @@ private void makeDictForNewSegment(CubeInstance cube, CubeSegment newSeg, List mergingSegments) { + private void makeSnapshotForNewSegment(CubeInstance cube, CubeSegment newSeg) { + List mergingSegments = cube.getMergingSegments(newSeg); for (Map.Entry entry : mergingSegments.get(0).getSnapshots().entrySet()) { newSeg.putSnapshotResPath(entry.getKey(), entry.getValue()); } diff --git a/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java b/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java index 32166ad..5a3c6d8 100644 --- a/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java +++ b/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java @@ -55,8 +55,15 @@ void validate(CubeInstance cubeInstance, List newSegments) throws C } public static class MergeOperationValidator extends CubeSegmentValidator { - private void checkContingency(CubeInstance cubeInstance, CubeSegment newSegment) throws CubeIntegrityException { + private void checkContingency(CubeInstance cubeInstance, List newSegments) throws CubeIntegrityException { + if (cubeInstance.getSegments().size() < 2) { + throw new CubeIntegrityException("No segments to merge."); + } + if (newSegments.size() != 1) { + throw new CubeIntegrityException("Invalid date range."); + } + CubeSegment newSegment = newSegments.get(0); CubeSegment startSeg = null; CubeSegment endSeg = null; for (CubeSegment segment : cubeInstance.getSegments()) { @@ -73,7 +80,9 @@ private void checkContingency(CubeInstance cubeInstance, CubeSegment newSegment) } } - private void checkLoopTableConsistency(CubeInstance cube, CubeSegment cubeSeg) throws CubeIntegrityException { + private void checkLoopTableConsistency(CubeInstance cube, List newSegments) throws CubeIntegrityException { + + CubeSegment cubeSeg = newSegments.get(0); DictionaryManager dictMgr = DictionaryManager.getInstance(cube.getConfig()); List segmentList = cube.getMergingSegments(cubeSeg); @@ -127,15 +136,8 @@ private void checkLoopTableConsistency(CubeInstance cube, CubeSegment cubeSeg) t @Override public void validate(CubeInstance cubeInstance, List newSegments) throws CubeIntegrityException { - if (cubeInstance.getSegments().size() < 2) { - throw new CubeIntegrityException("No segments to merge."); - } - if (newSegments.size() != 1) { - throw new CubeIntegrityException("Invalid date range."); - } - CubeSegment newSegment = newSegments.get(0); - this.checkContingency(cubeInstance, newSegment); - this.checkLoopTableConsistency(cubeInstance, newSegment); + this.checkContingency(cubeInstance, newSegments); + this.checkLoopTableConsistency(cubeInstance, newSegments); } } @@ -152,7 +154,7 @@ void validate(CubeInstance cubeInstance, List newSegments) throws C if (newSegments.size() != 1) { throw new CubeIntegrityException("Invalid date range."); } - if (cubeInstance.needMergeImmediately(newSegments.get(0))) { + if (cubeInstance.incrementalBuildOnHll()) { } else { CubeSegment newSegment = newSegments.get(0); diff --git a/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java b/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java index 6c0e745..bc6a0e4 100644 --- a/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java +++ b/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java @@ -213,7 +213,7 @@ private String getRowkeyDistributionOutputPath() { final String cuboidRootPath = jobWorkingDir + "/" + cubeName + "/cuboid/"; final String cuboidTmpRootPath = jobWorkingDir + "/" + cubeName + "/tmp_cuboid/"; - final boolean incBuildMerge = cube.needMergeImmediately(cube.getSegmentById(jobInstance.getUuid())); + final boolean incBuildMerge = cube.incrementalBuildOnHll(); String[] cuboidOutputTempPath = getCuboidOutputPaths(incBuildMerge?cuboidTmpRootPath:cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount); // base cuboid step diff --git a/job/src/main/java/com/kylinolap/job/JoinedFlatTable.java b/job/src/main/java/com/kylinolap/job/JoinedFlatTable.java index 996f99e..053dc5b 100644 --- a/job/src/main/java/com/kylinolap/job/JoinedFlatTable.java +++ b/job/src/main/java/com/kylinolap/job/JoinedFlatTable.java @@ -27,7 +27,6 @@ import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; -import com.kylinolap.cube.CubeInstance; import org.w3c.dom.Document; import org.w3c.dom.NodeList; import org.xml.sax.SAXException; @@ -188,6 +187,9 @@ private static void appendWhereStatement(JoinedFlatTableDesc intermediateTableDe long dateStart = cubeSegment.getDateRangeStart(); long dateEnd = cubeSegment.getDateRangeEnd(); + if (cubeSegment.getCubeInstance().incrementalBuildOnHll()) { + dateStart = cubeSegment.getCubeInstance().getDateRange()[1]; + } if (!(dateStart == 0 && dateEnd == 0)) { String partitionColumnName = cubeDesc.getCubePartitionDesc().getPartitionDateColumn(); diff --git a/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java b/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java index a571828..4db57b8 100644 --- a/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java +++ b/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; -import com.kylinolap.cube.CubeSegmentStatusEnum; import org.apache.commons.lang.exception.ExceptionUtils; import org.quartz.JobDataMap; import org.quartz.JobDetail; @@ -278,13 +277,11 @@ private void updateCubeSegmentInfoOnSucceed(JobInstance jobInstance, JobEngineCo } else { log.info("No step with name '" + JobConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE + "' is found"); } - CubeSegment segmentById = cubeInstance.getSegmentById(jobInstance.getUuid()); - if (cubeInstance.needMergeImmediately(segmentById)) { - for (CubeSegment seg : cubeInstance.getSegment(CubeSegmentStatusEnum.READY)) { - if (seg.getDateRangeEnd() >= segmentById.getDateRangeStart()) { - sourceCount += seg.getSourceRecords(); - sourceSize += seg.getSourceRecordsSize(); - } + + if (cubeInstance.incrementalBuildOnHll()) { + for (CubeSegment seg : cubeInstance.getMergingSegments()) { + sourceCount += seg.getSourceRecords(); + sourceSize += seg.getSourceRecordsSize(); } } break; diff --git a/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java b/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java index 584162e..4af5932 100644 --- a/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java +++ b/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java @@ -241,7 +241,9 @@ protected void waitCubeBuilt(List jobs) throws Exception { String uuid = seg.getUuid(); jobUuids.add(uuid); jobs.add(jobManager.createJob(cubename, seg.getName(), uuid, jobType)); + seg.setLastBuildJobID(uuid); } + cubeMgr.updateCube(cube); for (JobInstance job: jobs) { // submit job to store jobManager.submitJob(job); diff --git a/server/src/main/java/com/kylinolap/rest/service/JobService.java b/server/src/main/java/com/kylinolap/rest/service/JobService.java index dc1252b..125d732 100644 --- a/server/src/main/java/com/kylinolap/rest/service/JobService.java +++ b/server/src/main/java/com/kylinolap/rest/service/JobService.java @@ -112,8 +112,10 @@ public String submitJob(CubeInstance cube, long startDate, long endDate, CubeBui for (CubeSegment segment : cubeSegments) { uuid = segment.getUuid(); JobInstance job = this.getJobManager().createJob(cube.getName(), segment.getName(), segment.getUuid(), buildType); + segment.setLastBuildJobID(uuid); jobs.add(job); } + getCubeManager().updateCube(cube); for (JobInstance job: jobs) { this.getJobManager().submitJob(job); permissionService.init(job, null); From 55272ab6060b48044cd7e87434396f2569d21b32 Mon Sep 17 00:00:00 2001 From: "qianhao.zhou" Date: Fri, 28 Nov 2014 10:57:02 +0800 Subject: [PATCH 2/2] fix --- .../main/java/com/kylinolap/cube/CubeInstance.java | 50 +++++++++++++++++++++- .../main/java/com/kylinolap/cube/CubeManager.java | 33 ++++++++------ .../main/java/com/kylinolap/cube/CubeSegment.java | 1 + .../com/kylinolap/cube/CubeSegmentValidator.java | 4 +- .../java/com/kylinolap/job/JobInstanceBuilder.java | 3 +- .../java/com/kylinolap/job/JoinedFlatTable.java | 2 +- .../com/kylinolap/job/flow/JobFlowListener.java | 2 +- 7 files changed, 74 insertions(+), 21 deletions(-) diff --git a/cube/src/main/java/com/kylinolap/cube/CubeInstance.java b/cube/src/main/java/com/kylinolap/cube/CubeInstance.java index b75a8a5..271a8e7 100644 --- a/cube/src/main/java/com/kylinolap/cube/CubeInstance.java +++ b/cube/src/main/java/com/kylinolap/cube/CubeInstance.java @@ -25,12 +25,14 @@ import com.fasterxml.jackson.annotation.JsonManagedReference; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Objects; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.kylinolap.common.KylinConfig; import com.kylinolap.common.persistence.ResourceStore; import com.kylinolap.common.persistence.RootPersistentEntity; import com.kylinolap.metadata.MetadataManager; import com.kylinolap.metadata.model.cube.CubeDesc; +import com.kylinolap.metadata.model.cube.CubePartitionDesc; import com.kylinolap.metadata.model.invertedindex.InvertedIndexDesc; @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) @@ -383,8 +385,52 @@ public void setCreateTime(String createTime) { return new long[]{start, end}; } - public boolean incrementalBuildOnHll() { - return (!getSegment(CubeSegmentStatusEnum.READY).isEmpty()) && getDescriptor().hasHolisticCountDistinctMeasures(); + private boolean appendOnHll() { + CubePartitionDesc cubePartitionDesc = getDescriptor().getCubePartitionDesc(); + if (cubePartitionDesc == null) { + return false; + } + if (cubePartitionDesc.getPartitionDateColumn() == null) { + return false; + } + if (cubePartitionDesc.getCubePartitionType() != CubePartitionDesc.CubePartitionType.APPEND) { + return false; + } + return getDescriptor().hasHolisticCountDistinctMeasures(); + } + + public boolean appendBuildOnHllMeasure(long startDate, long endDate) { + if (!appendOnHll()) { + return false; + } + List readySegments = getSegment(CubeSegmentStatusEnum.READY); + if (readySegments.isEmpty()) { + return false; + } + for (CubeSegment readySegment: readySegments) { + if (readySegment.getDateRangeStart() == startDate && readySegment.getDateRangeEnd() == endDate) { + //refresh build + return false; + } + } + return true; + } + + public boolean needMergeImmediatelyAfterBuild(CubeSegment segment) { + if (!appendOnHll()) { + return false; + } + List readySegments = getSegment(CubeSegmentStatusEnum.READY); + if (readySegments.isEmpty()) { + return false; + } + for (CubeSegment readySegment: readySegments) { + if (readySegment.getDateRangeEnd() < segment.getDateRangeStart()) { + //has overlap and not refresh + return true; + } + } + return false; } } diff --git a/cube/src/main/java/com/kylinolap/cube/CubeManager.java b/cube/src/main/java/com/kylinolap/cube/CubeManager.java index 557180a..ca4ce65 100644 --- a/cube/src/main/java/com/kylinolap/cube/CubeManager.java +++ b/cube/src/main/java/com/kylinolap/cube/CubeManager.java @@ -270,8 +270,9 @@ public CubeInstance updateCube(CubeInstance cube) throws IOException { } List segments = new ArrayList(); + final boolean appendBuildOnHllMeasure = cubeInstance.appendBuildOnHllMeasure(startDate, endDate); if (null != cubeInstance.getDescriptor().getCubePartitionDesc().getPartitionDateColumn()) { - if (cubeInstance.incrementalBuildOnHll()) { + if (appendBuildOnHllMeasure) { long[] dateRange = cubeInstance.getDateRange(); segments.add(buildSegment(cubeInstance, dateRange[0], endDate)); } else { @@ -306,9 +307,15 @@ public CubeInstance updateCube(CubeInstance cube) throws IOException { validateNewSegments(cubeInstance, buildType, segments); - if (buildType == CubeBuildTypeEnum.MERGE || cubeInstance.incrementalBuildOnHll()) { - this.makeDictForNewSegment(cubeInstance, segments.get(0)); - this.makeSnapshotForNewSegment(cubeInstance, segments.get(0)); + CubeSegment newSeg = segments.get(0); + if (buildType == CubeBuildTypeEnum.MERGE) { + List mergingSegments = cubeInstance.getMergingSegments(newSeg); + this.makeDictForNewSegment(cubeInstance, newSeg, mergingSegments); + this.makeSnapshotForNewSegment(cubeInstance, newSeg, mergingSegments); + } else if (appendBuildOnHllMeasure) { + List mergingSegments = cubeInstance.getSegment(CubeSegmentStatusEnum.READY); + this.makeDictForNewSegment(cubeInstance, newSeg, mergingSegments); + this.makeSnapshotForNewSegment(cubeInstance, newSeg, mergingSegments); } cubeInstance.getSegments().addAll(segments); @@ -332,14 +339,17 @@ public static String getHtableMetadataKey() { return "KYLIN_HOST"; } - public void updateSegmentOnJobSucceed(CubeInstance cubeInstance, CubeBuildTypeEnum buildType, String segmentName, String lastBuildJobUuid, long lastBuildTime, long sizeKB, long sourceRecordCount, long sourceRecordsSize) throws IOException, CubeIntegrityException { + public void updateSegmentOnJobSucceed(CubeInstance cubeInstance, CubeBuildTypeEnum buildType, String segmentName, String jobUuid, long lastBuildTime, long sizeKB, long sourceRecordCount, long sourceRecordsSize) throws IOException, CubeIntegrityException { List segmentsInNewStatus = cubeInstance.getSegments(CubeSegmentStatusEnum.NEW); - CubeSegment cubeSegment = cubeInstance.getSegment(segmentName, CubeSegmentStatusEnum.NEW); + CubeSegment cubeSegment = cubeInstance.getSegmentById(jobUuid); + if (cubeSegment == null) { + cubeSegment = cubeInstance.getSegment(segmentName, CubeSegmentStatusEnum.NEW); + } switch (buildType) { case BUILD: - if (cubeInstance.incrementalBuildOnHll()) { + if (cubeInstance.needMergeImmediatelyAfterBuild(cubeSegment)) { cubeInstance.getSegments().removeAll(cubeInstance.getMergingSegments()); } else { if (segmentsInNewStatus.size() == 1) {// if this the last segment in @@ -354,7 +364,7 @@ public void updateSegmentOnJobSucceed(CubeInstance cubeInstance, CubeBuildTypeEn break; } - cubeSegment.setLastBuildJobID(lastBuildJobUuid); + cubeSegment.setLastBuildJobID(jobUuid); cubeSegment.setLastBuildTime(lastBuildTime); cubeSegment.setSizeKB(sizeKB); cubeSegment.setSourceRecords(sourceRecordCount); @@ -445,9 +455,7 @@ public LookupStringTable getLookupTable(CubeSegment cubeSegment, DimensionDesc d * @param newSeg * @throws IOException */ - private void makeDictForNewSegment(CubeInstance cube, CubeSegment newSeg) throws IOException { - List mergingSegments = cube.getMergingSegments(newSeg); - + private void makeDictForNewSegment(CubeInstance cube, CubeSegment newSeg, List mergingSegments) throws IOException { HashSet colsNeedMeringDict = new HashSet(); HashSet colsNeedCopyDict = new HashSet(); DictionaryManager dictMgr = this.getDictionaryManager(); @@ -489,8 +497,7 @@ private void makeDictForNewSegment(CubeInstance cube, CubeSegment newSeg) throws * @param cube * @param newSeg */ - private void makeSnapshotForNewSegment(CubeInstance cube, CubeSegment newSeg) { - List mergingSegments = cube.getMergingSegments(newSeg); + private void makeSnapshotForNewSegment(CubeInstance cube, CubeSegment newSeg, List mergingSegments) { for (Map.Entry entry : mergingSegments.get(0).getSnapshots().entrySet()) { newSeg.putSnapshotResPath(entry.getKey(), entry.getValue()); } diff --git a/cube/src/main/java/com/kylinolap/cube/CubeSegment.java b/cube/src/main/java/com/kylinolap/cube/CubeSegment.java index a312297..b2a9a96 100644 --- a/cube/src/main/java/com/kylinolap/cube/CubeSegment.java +++ b/cube/src/main/java/com/kylinolap/cube/CubeSegment.java @@ -305,6 +305,7 @@ public String toString() { .add("create_time:", createTime) .add("name", name) .add("last_build_job_id", lastBuildJobID) + .add("status", status) .toString(); } } diff --git a/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java b/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java index 5a3c6d8..c2ceb25 100644 --- a/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java +++ b/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java @@ -154,10 +154,10 @@ void validate(CubeInstance cubeInstance, List newSegments) throws C if (newSegments.size() != 1) { throw new CubeIntegrityException("Invalid date range."); } - if (cubeInstance.incrementalBuildOnHll()) { + CubeSegment newSegment = newSegments.get(0); + if (cubeInstance.needMergeImmediatelyAfterBuild(newSegment)) { } else { - CubeSegment newSegment = newSegments.get(0); // check if user will rebuild one specified segment boolean hasMatchSegment = false; for (CubeSegment segment : cubeInstance.getSegments()) { diff --git a/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java b/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java index bc6a0e4..3a9a77e 100644 --- a/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java +++ b/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java @@ -20,7 +20,6 @@ import com.google.common.collect.Lists; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.mapred.jobcontrol.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -213,7 +212,7 @@ private String getRowkeyDistributionOutputPath() { final String cuboidRootPath = jobWorkingDir + "/" + cubeName + "/cuboid/"; final String cuboidTmpRootPath = jobWorkingDir + "/" + cubeName + "/tmp_cuboid/"; - final boolean incBuildMerge = cube.incrementalBuildOnHll(); + final boolean incBuildMerge = cube.needMergeImmediatelyAfterBuild(); String[] cuboidOutputTempPath = getCuboidOutputPaths(incBuildMerge?cuboidTmpRootPath:cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount); // base cuboid step diff --git a/job/src/main/java/com/kylinolap/job/JoinedFlatTable.java b/job/src/main/java/com/kylinolap/job/JoinedFlatTable.java index 053dc5b..b2ba4dd 100644 --- a/job/src/main/java/com/kylinolap/job/JoinedFlatTable.java +++ b/job/src/main/java/com/kylinolap/job/JoinedFlatTable.java @@ -187,7 +187,7 @@ private static void appendWhereStatement(JoinedFlatTableDesc intermediateTableDe long dateStart = cubeSegment.getDateRangeStart(); long dateEnd = cubeSegment.getDateRangeEnd(); - if (cubeSegment.getCubeInstance().incrementalBuildOnHll()) { + if (cubeSegment.getCubeInstance().needMergeImmediatelyAfterBuild()) { dateStart = cubeSegment.getCubeInstance().getDateRange()[1]; } if (!(dateStart == 0 && dateEnd == 0)) { diff --git a/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java b/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java index 4db57b8..ddf8471 100644 --- a/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java +++ b/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java @@ -278,7 +278,7 @@ private void updateCubeSegmentInfoOnSucceed(JobInstance jobInstance, JobEngineCo log.info("No step with name '" + JobConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE + "' is found"); } - if (cubeInstance.incrementalBuildOnHll()) { + if (cubeInstance.needMergeImmediatelyAfterBuild()) { for (CubeSegment seg : cubeInstance.getMergingSegments()) { sourceCount += seg.getSourceRecords(); sourceSize += seg.getSourceRecordsSize();