From 1856c7aa599083564be695a513d97cdb300b7ca8 Mon Sep 17 00:00:00 2001 From: "qianhao.zhou" Date: Thu, 27 Nov 2014 19:05:59 +0800 Subject: [PATCH] fix --- .../main/java/com/kylinolap/cube/CubeInstance.java | 5 +-- .../main/java/com/kylinolap/cube/CubeManager.java | 41 ++++++++++++++-------- .../com/kylinolap/cube/CubeSegmentValidator.java | 3 +- .../com/kylinolap/job/flow/JobFlowListener.java | 9 +++-- 4 files changed, 34 insertions(+), 24 deletions(-) diff --git a/cube/src/main/java/com/kylinolap/cube/CubeInstance.java b/cube/src/main/java/com/kylinolap/cube/CubeInstance.java index b9bd6be..18f28f4 100644 --- a/cube/src/main/java/com/kylinolap/cube/CubeInstance.java +++ b/cube/src/main/java/com/kylinolap/cube/CubeInstance.java @@ -413,10 +413,7 @@ public boolean needMergeImmediately(long newSegmentRangeStart, long newSegmentRa return false; } } - if (getDateRange()[1] == newSegmentRangeStart) { - return true; - } - return false; + return true; } } diff --git a/cube/src/main/java/com/kylinolap/cube/CubeManager.java b/cube/src/main/java/com/kylinolap/cube/CubeManager.java index 0780031..8a4c2fb 100644 --- a/cube/src/main/java/com/kylinolap/cube/CubeManager.java +++ b/cube/src/main/java/com/kylinolap/cube/CubeManager.java @@ -20,6 +20,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import com.google.common.base.Preconditions; import com.kylinolap.dict.DateStrDictionary; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -306,9 +307,16 @@ public CubeInstance updateCube(CubeInstance cube) throws IOException { validateNewSegments(cubeInstance, buildType, segments); - if (buildType == CubeBuildTypeEnum.MERGE || needMergeImmediately) { - this.makeDictForNewSegment(cubeInstance, segments.get(0)); - this.makeSnapshotForNewSegment(cubeInstance, segments.get(0)); + if (buildType == CubeBuildTypeEnum.MERGE) { + CubeSegment newSeg = segments.get(0); + List mergingSegments = cubeInstance.getMergingSegments(newSeg); + this.makeDictForNewSegment(cubeInstance, newSeg, mergingSegments); + this.makeSnapshotForNewSegment(newSeg, mergingSegments); + } else if (needMergeImmediately) { + CubeSegment newSeg = segments.get(0); + List mergingSegments = cubeInstance.getSegment(CubeSegmentStatusEnum.READY); + this.makeDictForNewSegment(cubeInstance, newSeg, mergingSegments); + this.makeSnapshotForNewSegment(newSeg, mergingSegments); } cubeInstance.getSegments().addAll(segments); @@ -332,15 +340,20 @@ public static String getHtableMetadataKey() { return "KYLIN_HOST"; } - public void updateSegmentOnJobSucceed(CubeInstance cubeInstance, CubeBuildTypeEnum buildType, String segmentName, String lastBuildJobUuid, long lastBuildTime, long sizeKB, long sourceRecordCount, long sourceRecordsSize) throws IOException, CubeIntegrityException { + public void updateSegmentOnJobSucceed(CubeInstance cubeInstance, CubeBuildTypeEnum buildType, String segmentName, String jobUuid, long lastBuildTime, long sizeKB, long sourceRecordCount, long sourceRecordsSize) throws IOException, CubeIntegrityException { List segmentsInNewStatus = cubeInstance.getSegments(CubeSegmentStatusEnum.NEW); - CubeSegment cubeSegment = cubeInstance.getSegment(segmentName, CubeSegmentStatusEnum.NEW); + CubeSegment cubeSegment = cubeInstance.getSegmentById(jobUuid); + if (cubeSegment == null) { + cubeSegment = cubeInstance.getSegment(segmentName, CubeSegmentStatusEnum.NEW); + } + Preconditions.checkNotNull(cubeSegment); + Preconditions.checkArgument(cubeSegment.getStatus() == CubeSegmentStatusEnum.NEW, "invalid status of Segment:" + cubeSegment); switch (buildType) { case BUILD: - if (cubeInstance.needMergeImmediately(cubeInstance.getSegmentById(lastBuildJobUuid))) { - cubeInstance.getSegments().removeAll(cubeInstance.getMergingSegments()); + if (cubeInstance.needMergeImmediately(cubeSegment)) { + cubeInstance.getSegments().removeAll(cubeInstance.getSegment(CubeSegmentStatusEnum.READY)); } else { if (segmentsInNewStatus.size() == 1) {// if this the last segment in // status of NEW @@ -354,7 +367,7 @@ public void updateSegmentOnJobSucceed(CubeInstance cubeInstance, CubeBuildTypeEn break; } - cubeSegment.setLastBuildJobID(lastBuildJobUuid); + cubeSegment.setLastBuildJobID(jobUuid); cubeSegment.setLastBuildTime(lastBuildTime); cubeSegment.setSizeKB(sizeKB); cubeSegment.setSourceRecords(sourceRecordCount); @@ -445,17 +458,16 @@ public LookupStringTable getLookupTable(CubeSegment cubeSegment, DimensionDesc d * @param newSeg * @throws IOException */ - private void makeDictForNewSegment(CubeInstance cube, CubeSegment newSeg) throws IOException { - List mergingSegments = cube.getMergingSegments(newSeg); - + private void makeDictForNewSegment(CubeInstance cube, CubeSegment newSeg, List mergingSegments) throws IOException { HashSet colsNeedMeringDict = new HashSet(); HashSet colsNeedCopyDict = new HashSet(); DictionaryManager dictMgr = this.getDictionaryManager(); - for (DimensionDesc dim : cube.getDescriptor().getDimensions()) { + CubeDesc descriptor = cube.getDescriptor(); + for (DimensionDesc dim : descriptor.getDimensions()) { for (TblColRef col : dim.getColumnRefs()) { if (newSeg.getCubeDesc().getRowkey().isUseDictionary(col)) { - if (cube.getDescriptor().getFactTable().equalsIgnoreCase((String) dictMgr.decideSourceData(cube.getDescriptor(), col, null)[0])) { + if (descriptor.getFactTable().equalsIgnoreCase((String) dictMgr.decideSourceData(descriptor, col, null)[0])) { colsNeedMeringDict.add(col); } else { colsNeedCopyDict.add(col); @@ -489,8 +501,7 @@ private void makeDictForNewSegment(CubeInstance cube, CubeSegment newSeg) throws * @param cube * @param newSeg */ - private void makeSnapshotForNewSegment(CubeInstance cube, CubeSegment newSeg) { - List mergingSegments = cube.getMergingSegments(newSeg); + private void makeSnapshotForNewSegment(CubeSegment newSeg, List mergingSegments) { for (Map.Entry entry : mergingSegments.get(0).getSnapshots().entrySet()) { newSeg.putSnapshotResPath(entry.getKey(), entry.getValue()); } diff --git a/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java b/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java index 76fa238..32166ad 100644 --- a/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java +++ b/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java @@ -73,8 +73,7 @@ private void checkContingency(CubeInstance cubeInstance, CubeSegment newSegment) } } - private void checkLoopTableConsistency(CubeInstance cube, CubeSegment newSegment) throws CubeIntegrityException { - CubeSegment cubeSeg = newSegment; + private void checkLoopTableConsistency(CubeInstance cube, CubeSegment cubeSeg) throws CubeIntegrityException { DictionaryManager dictMgr = DictionaryManager.getInstance(cube.getConfig()); List segmentList = cube.getMergingSegments(cubeSeg); diff --git a/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java b/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java index 600b98b..a571828 100644 --- a/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java +++ b/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import com.kylinolap.cube.CubeSegmentStatusEnum; import org.apache.commons.lang.exception.ExceptionUtils; import org.quartz.JobDataMap; import org.quartz.JobDetail; @@ -279,9 +280,11 @@ private void updateCubeSegmentInfoOnSucceed(JobInstance jobInstance, JobEngineCo } CubeSegment segmentById = cubeInstance.getSegmentById(jobInstance.getUuid()); if (cubeInstance.needMergeImmediately(segmentById)) { - for (CubeSegment seg : cubeInstance.getMergingSegments()) { - sourceCount += seg.getSourceRecords(); - sourceSize += seg.getSourceRecordsSize(); + for (CubeSegment seg : cubeInstance.getSegment(CubeSegmentStatusEnum.READY)) { + if (seg.getDateRangeEnd() >= segmentById.getDateRangeStart()) { + sourceCount += seg.getSourceRecords(); + sourceSize += seg.getSourceRecordsSize(); + } } } break;