From 080e36095a2ff7176e535c0fa63fd8cdff98b590 Mon Sep 17 00:00:00 2001 From: yanghao3 Date: Tue, 24 Oct 2017 14:00:37 +0800 Subject: [PATCH] init --- .../java/org/apache/kylin/cube/CubeInstance.java | 2 +- .../java/org/apache/kylin/cube/model/CubeDesc.java | 11 +++ .../org/apache/kylin/cube/CubeManagerTest.java | 84 ++++++++++++++++++++++ .../org/apache/kylin/metadata/model/Segments.java | 23 +++++- webapp/app/js/model/cubeDescModel.js | 1 + .../partials/cubeDesigner/refresh_settings.html | 25 +++++++ 6 files changed, 144 insertions(+), 2 deletions(-) diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java index f35cfa2..cc56727 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java @@ -479,7 +479,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, } public SegmentRange autoMergeCubeSegments() throws IOException { - return segments.autoMergeCubeSegments(needAutoMerge(), getName(), getDescriptor().getAutoMergeTimeRanges()); + return segments.autoMergeCubeSegments(needAutoMerge(), getName(), getDescriptor().getAutoMergeTimeRanges(), getDescriptor().getVolatileRange()); } public Segments calculateToBeSegments(CubeSegment newSegment) { diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index c9917ea..01dd302 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -162,6 +162,8 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { private long partitionDateEnd = 3153600000000L; @JsonProperty("auto_merge_time_ranges") private long[] autoMergeTimeRanges; + @JsonProperty("volatile_range") + private long volatileRange = 0; @JsonProperty("retention_range") private long retentionRange = 0; @JsonProperty("engine_type") @@ -1101,6 +1103,14 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { return result; } + public long getVolatileRange() { + return volatileRange; + } + + public void setVolatileRange(long volatileRange) { + this.volatileRange = volatileRange; + } + public long getRetentionRange() { return retentionRange; } @@ -1338,6 +1348,7 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { newCubeDesc.setAutoMergeTimeRanges(cubeDesc.getAutoMergeTimeRanges()); newCubeDesc.setPartitionDateStart(cubeDesc.getPartitionDateStart()); newCubeDesc.setPartitionDateEnd(cubeDesc.getPartitionDateEnd()); + newCubeDesc.setVolatileRange(cubeDesc.getVolatileRange()); newCubeDesc.setRetentionRange(cubeDesc.getRetentionRange()); newCubeDesc.setEngineType(cubeDesc.getEngineType()); newCubeDesc.setStorageType(cubeDesc.getStorageType()); diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java index d8d48f7..26f3a6b 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java @@ -305,6 +305,90 @@ public class CubeManagerTest extends LocalFileMetadataTestCase { } @Test + public void testAutoMergeWithVolatileRange() throws Exception { + CubeManager mgr = CubeManager.getInstance(getTestConfig()); + CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty"); + + cube.getDescriptor().setAutoMergeTimeRanges(new long[]{2000, 6000}); + + mgr.updateCube(new CubeUpdate(cube)); + + assertTrue(cube.needAutoMerge()); + + // no segment at first + assertEquals(0, cube.getSegments().size()); + + // append first + CubeSegment seg1 = mgr.appendSegment(cube, new TSRange(0L, 1000L)); + seg1.setStatus(SegmentStatusEnum.READY); + + CubeSegment seg3 = mgr.appendSegment(cube, new TSRange( 2000L, 4000L)); + seg3.setStatus(SegmentStatusEnum.READY); + + assertEquals(2, cube.getSegments().size()); + + SegmentRange mergedSeg = cube.autoMergeCubeSegments(); + + assertTrue(mergedSeg == null); + + assertEquals(2, cube.getSegments().size()); + + // append a new seg + + CubeSegment seg4 = mgr.appendSegment(cube, new TSRange(4000L, 8000L)); + seg4.setStatus(SegmentStatusEnum.READY); + + assertEquals(3, cube.getSegments().size()); + + cube.getDescriptor().setVolatileRange(10000); + + mergedSeg = cube.autoMergeCubeSegments(); + + assertTrue(mergedSeg == null); + + //will merge after change the volatile_range + + cube.getDescriptor().setVolatileRange(0); + + mergedSeg = cube.autoMergeCubeSegments(); + + assertTrue(mergedSeg != null); + + assertTrue((Long) mergedSeg.start.v == 2000 && (Long) mergedSeg.end.v == 8000); + + // fill the gap + + CubeSegment seg2 = mgr.appendSegment(cube, new TSRange( 1000L, 2000L)); + seg2.setStatus(SegmentStatusEnum.READY); + + assertEquals(4, cube.getSegments().size()); + + cube.getDescriptor().setVolatileRange(10000); + + mergedSeg = cube.autoMergeCubeSegments(); + + assertTrue(mergedSeg == null); + + //will merge after change the volatile_range + cube.getDescriptor().setVolatileRange(0); + + mergedSeg = cube.autoMergeCubeSegments(); + + assertTrue(mergedSeg != null); + + assertTrue((Long) mergedSeg.start.v == 0 && (Long) mergedSeg.end.v == 8000); + + cube.getDescriptor().setVolatileRange(1000); + + mergedSeg = cube.autoMergeCubeSegments(); + + assertTrue(mergedSeg != null); + + assertTrue((Long) mergedSeg.start.v == 0 && (Long) mergedSeg.end.v == 2000); + + } + + @Test public void testGetCubeNameWithNamespace() { System.setProperty("kylin.storage.hbase.table-name-prefix", "HELLO_"); try { diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java index 7af1ebc..99c6eb4 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java @@ -169,7 +169,26 @@ public class Segments extends ArrayList implements Serial return result; } - public SegmentRange autoMergeCubeSegments(boolean needAutoMerge, String cubeName, long[] timeRanges) throws IOException { + public void removeLatestSegmentByVolatileRange(Segments segs, long volatileRange) { + if(volatileRange <= 0 || segs.isEmpty()) { + return; + } + long tail = segs.get(segs.size() - 1).getTSRange().end.v; + long head = tail - volatileRange; + Segments volatileSegs = new Segments(); + for(T seg: segs) { + if(seg.getTSRange().end.v >= head) { + logger.warn("segment in volatile range: seg:" + seg.toString() + + "rangeStart:" + seg.getTSRange().start.v + ", rangeEnd" + seg.getTSRange().end.v); + volatileSegs.add(seg); + } + } + + segs.removeAll(volatileSegs); + + } + + public SegmentRange autoMergeCubeSegments(boolean needAutoMerge, String cubeName, long[] timeRanges, long volatileRange) throws IOException { if (!needAutoMerge) { logger.debug("Cube " + cubeName + " doesn't need auto merge"); return null; @@ -195,6 +214,8 @@ public class Segments extends ArrayList implements Serial } } + removeLatestSegmentByVolatileRange(readySegs, volatileRange); + // exclude those already under merging segments readySegs.removeAll(mergingSegs); diff --git a/webapp/app/js/model/cubeDescModel.js b/webapp/app/js/model/cubeDescModel.js index b9b95da..cf44a25 100644 --- a/webapp/app/js/model/cubeDescModel.js +++ b/webapp/app/js/model/cubeDescModel.js @@ -52,6 +52,7 @@ KylinApp.service('CubeDescModel', function (kylinConfig) { "hbase_mapping": { "column_family": [] }, + "volatile_range": "0", "retention_range": "0", "status_need_notify":['ERROR', 'DISCARDED', 'SUCCEED'], "auto_merge_time_ranges": [604800000, 2419200000], diff --git a/webapp/app/partials/cubeDesigner/refresh_settings.html b/webapp/app/partials/cubeDesigner/refresh_settings.html index fc5f657..2d2509d 100755 --- a/webapp/app/partials/cubeDesigner/refresh_settings.html +++ b/webapp/app/partials/cubeDesigner/refresh_settings.html @@ -100,6 +100,27 @@
+ +
+ + + {{cubeMetaFrame.volatile_range | millisecondsToDay}} +
+
+
+ + + + + + +
+
+ @@ -161,6 +182,10 @@

The thresholds will be checked ascendingly to see if any consectuive segments' time range has exceeded it. For example the [7 days, 30 days] will result in daily incremental segments being merged every 7 days, and the 7-days segments will get merged every 30 days.

+ + -- 1.9.1