From aa380697c079527a79fb7d782e922e546daec5b9 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Thu, 31 Mar 2016 18:06:38 +0800 Subject: [PATCH] KYLIN-1500: Split large gap for solving out of memory issue when streaming_fillgap --- .../kylin/engine/streaming/BootstrapConfig.java | 9 ++++++++ .../kylin/engine/streaming/cli/StreamingCLI.java | 26 ++++++++++++++++++++-- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java index e690e9a..35bdfa8 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java @@ -27,6 +27,7 @@ public class BootstrapConfig { private long end = 0L; private boolean fillGap; + private long maxFillGapRange = 4 * 3600 * 1000L; public long getStart() { return start; @@ -59,4 +60,12 @@ public class BootstrapConfig { public void setFillGap(boolean fillGap) { this.fillGap = fillGap; } + + public long getMaxFillGapRange() { + return maxFillGapRange; + } + + public void setMaxFillGapRange(long maxFillGapRange) { + this.maxFillGapRange = maxFillGapRange; + } } diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java index 0bab396..487b4b9 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java @@ -36,6 +36,7 @@ package org.apache.kylin.engine.streaming.cli; import java.util.List; +import com.google.common.collect.Lists; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Pair; @@ -76,6 +77,9 @@ public class StreamingCLI { case "-fillGap": bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i])); break; + case "-maxFillGapRange": + bootstrapConfig.setMaxFillGapRange(Long.parseLong(args[++i])); + break; default: logger.warn("ignore this arg:" + argName); } @@ -85,7 +89,12 @@ public class StreamingCLI { final List> gaps = StreamingMonitor.findGaps(bootstrapConfig.getCubeName()); logger.info("all gaps:" + StringUtils.join(gaps, ",")); for (Pair gap : gaps) { - startOneOffCubeStreaming(bootstrapConfig.getCubeName(), gap.getFirst(), gap.getSecond()); + List> splitGaps = splitGap(gap, bootstrapConfig.getMaxFillGapRange()); + for (Pair splitGap : splitGaps) { + logger.info("start filling the gap from " + splitGap.getFirst() + " to " + splitGap.getSecond()); + startOneOffCubeStreaming(bootstrapConfig.getCubeName(), splitGap.getFirst(), splitGap.getSecond()); + logger.info("finish filling the gap from " + splitGap.getFirst() + " to " + splitGap.getSecond()); + } } } else { startOneOffCubeStreaming(bootstrapConfig.getCubeName(), bootstrapConfig.getStart(), bootstrapConfig.getEnd()); @@ -98,7 +107,20 @@ public class StreamingCLI { System.exit(-1); } } - + + private static List> splitGap(Pair gap, long maxFillGapRange) { + List> gaps = Lists.newArrayList(); + Long startTime = gap.getFirst(); + + while (startTime < gap.getSecond()) { + Long endTime = gap.getSecond() <= startTime + maxFillGapRange ? gap.getSecond() : startTime + maxFillGapRange; + gaps.add(Pair.newPair(startTime, endTime)); + startTime = endTime; + } + + return gaps; + } + private static void startOneOffCubeStreaming(String cubeName, long start, long end) { final Runnable runnable = new OneOffStreamingBuilder(RealizationType.CUBE, cubeName, start, end).build(); runnable.run(); -- 2.5.4 (Apple Git-61)