From a4f9c3922ef1d7b7cac535347f455bce783e9d2f Mon Sep 17 00:00:00 2001 From: "U-CORP\\mingmwang" Date: Thu, 12 Oct 2017 17:05:52 +0800 Subject: [PATCH] APACHE-KYLIN-2932: Simplify the thread model for in-memory cubing --- .../org/apache/kylin/common/KylinConfigBase.java | 4 + .../cube/inmemcubing/CompoundCuboidWriter.java | 8 + .../kylin/cube/inmemcubing/DoggedCubeBuilder.java | 4 +- .../cube/inmemcubing/ICuboidGTTableWriter.java | 47 +++ .../kylin/cube/inmemcubing/ICuboidWriter.java | 3 + .../kylin/cube/inmemcubing/InMemCubeBuilder.java | 14 +- .../apache/kylin/cube/inmemcubing2/CuboidTask.java | 53 +++ .../DefaultCuboidCollectorWithCallBack.java | 53 +++ .../cube/inmemcubing2/DoggedCubeBuilder2.java | 442 +++++++++++++++++++++ .../inmemcubing2/ICuboidCollectorWithCallBack.java | 28 ++ .../cube/inmemcubing2/ICuboidResultListener.java | 25 ++ .../kylin/cube/inmemcubing2/InMemCubeBuilder2.java | 408 +++++++++++++++++++ .../mr/steps/InMemCuboidFromBaseCuboidMapper.java | 25 +- .../kylin/engine/mr/steps/InMemCuboidMapper.java | 23 +- .../engine/mr/steps/InMemCuboidMapperBase.java | 30 +- .../kylin/engine/mr/steps/KVGTRecordWriter.java | 4 +- .../inmemcubing/ITDoggedCubeBuilderStressTest.java | 6 + .../cube/inmemcubing/ITDoggedCubeBuilderTest.java | 73 +++- .../cube/inmemcubing/ITInMemCubeBuilderTest.java | 7 + .../storage/hbase/steps/HBaseCuboidWriter.java | 4 +- 20 files changed, 1194 insertions(+), 67 deletions(-) create mode 100755 core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java create mode 100755 core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/CuboidTask.java create mode 100755 core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DefaultCuboidCollectorWithCallBack.java create mode 100755 core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DoggedCubeBuilder2.java create mode 100755 core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidCollectorWithCallBack.java create mode 100755 core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidResultListener.java create mode 100755 core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/InMemCubeBuilder2.java diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 7b48935..1901495 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -665,6 +665,10 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(this.getOptional("kylin.job.metadata-persist-retry", "5")); } + public String getCubeInMemBuilderClass() { + return getOptional("kylin.job.cube-inmem-builder-class", "org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder"); + } + // ============================================================================ // SOURCE.HIVE // ============================================================================ diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java index c82f418..df77978 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java @@ -21,6 +21,7 @@ package org.apache.kylin.cube.inmemcubing; import java.io.IOException; import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GridTable; /** */ @@ -39,6 +40,13 @@ public class CompoundCuboidWriter implements ICuboidWriter { writer.write(cuboidId, record); } } + + @Override + public void write(long cuboidId, GridTable table) throws IOException { + for (ICuboidWriter writer : cuboidWriters) { + writer.write(cuboidId, table); + } + } @Override public void flush() throws IOException { diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java index d761505..42e8c86 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java @@ -24,9 +24,9 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.PriorityQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentNavigableMap; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Dictionary; @@ -194,7 +194,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder { final RecordConsumeBlockingQueueController inputController; final InMemCubeBuilder builder; - ConcurrentNavigableMap buildResult; + NavigableMap buildResult; RuntimeException exception; public SplitThread(final int num, final RecordConsumeBlockingQueueController inputController) { diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java new file mode 100755 index 0000000..93a7994 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + + +package org.apache.kylin.cube.inmemcubing; + +import java.io.IOException; + +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GTScanRequest; +import org.apache.kylin.gridtable.GTScanRequestBuilder; +import org.apache.kylin.gridtable.GridTable; +import org.apache.kylin.gridtable.IGTScanner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class ICuboidGTTableWriter implements ICuboidWriter{ + + private static Logger logger = LoggerFactory.getLogger(ICuboidGTTableWriter.class); + + @Override + public void write(long cuboidId, GridTable gridTable) throws IOException { + long startTime = System.currentTimeMillis(); + GTScanRequest req = new GTScanRequestBuilder().setInfo(gridTable.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest(); + IGTScanner scanner = gridTable.scan(req); + for (GTRecord record : scanner) { + write(cuboidId, record); + } + scanner.close(); + logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms"); + } +} diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java index 3f6cb0c..4ae182e 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java @@ -21,6 +21,7 @@ package org.apache.kylin.cube.inmemcubing; import java.io.IOException; import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GridTable; /** */ @@ -28,6 +29,8 @@ public interface ICuboidWriter { void write(long cuboidId, GTRecord record) throws IOException; + void write(long cuboidId, GridTable table) throws IOException; + void flush() throws IOException; void close() throws IOException; diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java index db3eb5d..899c145 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java @@ -19,14 +19,13 @@ package org.apache.kylin.cube.inmemcubing; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.TreeSet; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; @@ -47,6 +46,7 @@ import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.GTScanRequestBuilder; import org.apache.kylin.gridtable.GridTable; import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.gridtable.IGTStore; import org.apache.kylin.measure.topn.Counter; import org.apache.kylin.measure.topn.TopNCounter; import org.apache.kylin.metadata.datatype.DoubleMutable; @@ -114,7 +114,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { // Below several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest. // MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget); // MemDiskStore store = new MemDiskStore(info, MemoryBudgetController.ZERO_BUDGET); - ConcurrentDiskStore store = new ConcurrentDiskStore(info); + IGTStore store = new ConcurrentDiskStore(info); GridTable gridTable = new GridTable(info, store); return gridTable; @@ -123,7 +123,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { @Override public void build(BlockingQueue input, InputConverterUnit inputConverterUnit, ICuboidWriter output) throws IOException { - ConcurrentNavigableMap result = build( + NavigableMap result = build( RecordConsumeBlockingQueueController.getQueueController(inputConverterUnit, input)); try { for (CuboidResult cuboidResult : result.values()) { @@ -135,9 +135,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { } } - public ConcurrentNavigableMap build(RecordConsumeBlockingQueueController input) + public NavigableMap build(RecordConsumeBlockingQueueController input) throws IOException { - final ConcurrentNavigableMap result = new ConcurrentSkipListMap(); + final NavigableMap result = new ConcurrentSkipListMap(); build(input, new ICuboidCollector() { @Override public void collect(CuboidResult cuboidResult) { @@ -216,7 +216,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { } private void throwExceptionIfAny() throws IOException { - ArrayList errors = new ArrayList(); + List errors = Lists.newArrayList(); for (int i = 0; i < taskThreadCount; i++) { Throwable t = taskThreadExceptions[i]; if (t != null) diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/CuboidTask.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/CuboidTask.java new file mode 100755 index 0000000..cf54eb6 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/CuboidTask.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + + +package org.apache.kylin.cube.inmemcubing2; + +import java.io.IOException; +import java.util.concurrent.RecursiveTask; + +import org.apache.kylin.cube.inmemcubing.CuboidResult; + +@SuppressWarnings("serial") +class CuboidTask extends RecursiveTask implements Comparable { + final CuboidResult parent; + final long childCuboidId; + final InMemCubeBuilder2 cubeBuilder; + + CuboidTask(CuboidResult parent, long childCuboidId, InMemCubeBuilder2 cubeBuilder) { + this.parent = parent; + this.childCuboidId = childCuboidId; + this.cubeBuilder = cubeBuilder; + } + + @Override + public int compareTo(CuboidTask o) { + long comp = this.childCuboidId - o.childCuboidId; + return comp < 0 ? -1 : (comp > 0 ? 1 : 0); + } + + @Override + protected CuboidResult compute() { + try { + return cubeBuilder.buildCuboid(this); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DefaultCuboidCollectorWithCallBack.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DefaultCuboidCollectorWithCallBack.java new file mode 100755 index 0000000..d7f738d --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DefaultCuboidCollectorWithCallBack.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube.inmemcubing2; + +import java.util.NavigableMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; + +import org.apache.kylin.cube.inmemcubing.CuboidResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultCuboidCollectorWithCallBack implements ICuboidCollectorWithCallBack{ + + private static Logger logger = LoggerFactory.getLogger(DefaultCuboidCollectorWithCallBack.class); + + final ConcurrentNavigableMap result = new ConcurrentSkipListMap(); + final ICuboidResultListener listener; + + public DefaultCuboidCollectorWithCallBack(ICuboidResultListener listener){ + this.listener = listener; + } + + @Override + public void collectAndNotify(CuboidResult cuboidResult) { + logger.info("collecting CuboidResult cuboid id:" + cuboidResult.cuboidId); + result.put(cuboidResult.cuboidId, cuboidResult); + if (listener != null) { + listener.finish(cuboidResult); + } + } + + @Override + public NavigableMap getAllResult() { + return result; + } +} diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DoggedCubeBuilder2.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DoggedCubeBuilder2.java new file mode 100755 index 0000000..4c5da87 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DoggedCubeBuilder2.java @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube.inmemcubing2; + +import java.io.IOException; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; +import java.util.concurrent.ForkJoinWorkerThread; +import java.util.concurrent.RecursiveTask; + +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder; +import org.apache.kylin.cube.inmemcubing.CuboidResult; +import org.apache.kylin.cube.inmemcubing.ICuboidWriter; +import org.apache.kylin.cube.inmemcubing.InputConverterUnit; +import org.apache.kylin.cube.inmemcubing.RecordConsumeBlockingQueueController; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GTScanRequestBuilder; +import org.apache.kylin.gridtable.GridTable; +import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.measure.MeasureAggregators; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; + +public class DoggedCubeBuilder2 extends AbstractInMemCubeBuilder { + private static Logger logger = LoggerFactory.getLogger(DoggedCubeBuilder2.class); + + public DoggedCubeBuilder2(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc, + Map> dictionaryMap) { + super(cuboidScheduler, flatDesc, dictionaryMap); + } + + @Override + public void build(BlockingQueue input, InputConverterUnit inputConverterUnit, ICuboidWriter output) + throws IOException { + new BuildOnce().build(input, inputConverterUnit, output); + } + + private class BuildOnce { + public void build(BlockingQueue input, InputConverterUnit inputConverterUnit, ICuboidWriter output) + throws IOException { + final RecordConsumeBlockingQueueController inputController = RecordConsumeBlockingQueueController + .getQueueController(inputConverterUnit, input); + + final List builderList = new CopyOnWriteArrayList<>(); + + ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() { + @Override + public ForkJoinWorkerThread newThread(ForkJoinPool pool) { + final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); + worker.setName("dogged-cubing-cuboid-worker-" + worker.getPoolIndex()); + return worker; + } + }; + + ForkJoinPool builderPool = new ForkJoinPool(taskThreadCount, factory, null, true); + CuboidResultWatcher resultWatcher = new CuboidResultWatcher(builderList, output); + + Stopwatch sw = new Stopwatch(); + sw.start(); + logger.info("Dogged Cube Build2 start"); + try { + BaseCuboidTask task = new BaseCuboidTask<>(inputController, 1, resultWatcher); + builderPool.execute(task); + do { + builderList.add(task.getInternalBuilder()); + //Exception will be thrown here if cube building failure + task.join(); + task = task.nextTask(); + } while (task != null); + + logger.info("Has finished feeding data, and base cuboid built, start to build child cuboids"); + for (final InMemCubeBuilder2 builder : builderList) { + builderPool.submit(new Runnable() { + @Override + public void run() { + builder.startBuildFromBaseCuboid(); + } + }); + } + resultWatcher.start(); + logger.info("Dogged Cube Build2 splits complete, took " + sw.elapsedMillis() + " ms"); + } catch (Throwable e) { + logger.error("Dogged Cube Build2 error", e); + if (e instanceof Error) + throw (Error) e; + else if (e instanceof RuntimeException) + throw (RuntimeException) e; + else + throw new IOException(e); + } finally { + output.close(); + closeGirdTables(builderList); + sw.stop(); + builderPool.shutdownNow(); + logger.info("Dogged Cube Build2 end, totally took " + sw.elapsedMillis() + " ms"); + logger.info("Dogged Cube Build2 return"); + } + } + + private void closeGirdTables(List builderList) { + for (InMemCubeBuilder2 inMemCubeBuilder : builderList) { + for (CuboidResult cuboidResult : inMemCubeBuilder.getResultCollector().getAllResult().values()) { + closeGirdTable(cuboidResult.table); + } + } + } + + private void closeGirdTable(GridTable gridTable) { + try { + gridTable.close(); + } catch (Throwable e) { + logger.error("Error closing grid table " + gridTable, e); + } + } + } + + private class BaseCuboidTask extends RecursiveTask { + private static final long serialVersionUID = -5408592502260876799L; + + private final int splitSeq; + private final ICuboidResultListener resultListener; + + private RecordConsumeBlockingQueueController inputController; + private InMemCubeBuilder2 builder; + + private volatile BaseCuboidTask next; + + public BaseCuboidTask(final RecordConsumeBlockingQueueController inputController, int splitSeq, + ICuboidResultListener resultListener) { + this.inputController = inputController; + this.splitSeq = splitSeq; + this.resultListener = resultListener; + this.builder = new InMemCubeBuilder2(cuboidScheduler, flatDesc, dictionaryMap); + builder.setReserveMemoryMB(reserveMemoryMB); + builder.setConcurrentThreads(taskThreadCount); + logger.info("Split #" + splitSeq + " kickoff"); + } + + @Override + protected CuboidResult compute() { + try { + CuboidResult baseCuboidResult = builder.buildBaseCuboid(inputController, resultListener); + if (!inputController.ifEnd()) { + next = new BaseCuboidTask<>(inputController, splitSeq + 1, resultListener); + next.fork(); + } + logger.info("Split #" + splitSeq + " finished"); + return baseCuboidResult; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public InMemCubeBuilder2 getInternalBuilder() { + return builder; + } + + public BaseCuboidTask nextTask() { + return next; + } + } + + /** + * Class response for watch the cube building result, monitor the cube building process and trigger merge actions if required. + * + */ + private class CuboidResultWatcher implements ICuboidResultListener { + final BlockingQueue outputQueue; + final Map> pendingQueue = Maps.newHashMap(); + final List builderList; + final ICuboidWriter output; + + public CuboidResultWatcher(final List builderList, final ICuboidWriter output) { + this.outputQueue = Queues.newLinkedBlockingQueue(); + this.builderList = builderList; + this.output = output; + } + + public void start() throws IOException { + SplitMerger merger = new SplitMerger(); + while (true) { + if (!outputQueue.isEmpty()) { + List splitResultReturned = Lists.newArrayList(); + outputQueue.drainTo(splitResultReturned); + for (CuboidResult splitResult : splitResultReturned) { + if (builderList.size() == 1) { + merger.mergeAndOutput(Lists.newArrayList(splitResult), output); + } else { + List cuboidResultList = pendingQueue.get(splitResult.cuboidId); + if (cuboidResultList == null) { + cuboidResultList = Lists.newArrayListWithExpectedSize(builderList.size()); + cuboidResultList.add(splitResult); + pendingQueue.put(splitResult.cuboidId, cuboidResultList); + } else { + cuboidResultList.add(splitResult); + } + if (cuboidResultList.size() == builderList.size()) { + merger.mergeAndOutput(cuboidResultList, output); + pendingQueue.remove(splitResult.cuboidId); + } + } + } + } + + boolean jobFinished = isAllBuildFinished(); + if (outputQueue.isEmpty() && !jobFinished) { + boolean ifWait = true; + for (InMemCubeBuilder2 builder : builderList) { + Queue queue = builder.getCompletedTaskQueue(); + while (queue.size() > 0) { + CuboidTask childTask = queue.poll(); + if (childTask.isCompletedAbnormally()) { + throw new RuntimeException(childTask.getException()); + } + ifWait = false; + } + } + if (ifWait) { + try { + Thread.sleep(100L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } else if (outputQueue.isEmpty() && pendingQueue.isEmpty() && jobFinished) { + return; + } + } + } + + private boolean isAllBuildFinished() { + for (InMemCubeBuilder2 split : builderList) { + if (!split.isAllCuboidDone()) { + return false; + } + } + return true; + } + + @Override + public void finish(CuboidResult result) { + Stopwatch stopwatch = new Stopwatch().start(); + int nRetries = 0; + while (!outputQueue.offer(result)) { + nRetries++; + long sleepTime = stopwatch.elapsedMillis(); + if (sleepTime > 3600000L) { + stopwatch.stop(); + throw new RuntimeException( + "OutputQueue Full. Cannot offer to the output queue after waiting for one hour!!! Current queue size: " + + outputQueue.size()); + } + logger.warn("OutputQueue Full. Queue size: " + outputQueue.size() + ". Total sleep time : " + sleepTime + + ", and retry count : " + nRetries); + try { + Thread.sleep(5000L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + stopwatch.stop(); + } + } + + private class SplitMerger { + MeasureAggregators reuseAggrs; + Object[] reuseMetricsArray; + ByteArray reuseMetricsSpace; + + long lastCuboidColumnCount; + ImmutableBitSet lastMetricsColumns; + + SplitMerger() { + reuseAggrs = new MeasureAggregators(cubeDesc.getMeasures()); + reuseMetricsArray = new Object[cubeDesc.getMeasures().size()]; + } + + public void mergeAndOutput(List splitResultList, ICuboidWriter output) throws IOException { + if (splitResultList.size() == 1) { + CuboidResult cuboidResult = splitResultList.get(0); + outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output); + return; + } + LinkedList open = Lists.newLinkedList(); + for (CuboidResult splitResult : splitResultList) { + open.add(new ResultMergeSlot(splitResult)); + } + + PriorityQueue heap = new PriorityQueue(); + while (true) { + // ready records in open slots and add to heap + while (!open.isEmpty()) { + ResultMergeSlot slot = open.removeFirst(); + if (slot.fetchNext()) { + heap.add(slot); + } + } + + // find the smallest on heap + ResultMergeSlot smallest = heap.poll(); + if (smallest == null) + break; + open.add(smallest); + + // merge with slots having the same key + if (smallest.isSameKey(heap.peek())) { + Object[] metrics = getMetricsValues(smallest.currentRecord); + reuseAggrs.reset(); + reuseAggrs.aggregate(metrics); + do { + ResultMergeSlot slot = heap.poll(); + open.add(slot); + metrics = getMetricsValues(slot.currentRecord); + reuseAggrs.aggregate(metrics); + } while (smallest.isSameKey(heap.peek())); + + reuseAggrs.collectStates(metrics); + setMetricsValues(smallest.currentRecord, metrics); + } + output.write(smallest.currentCuboidId, smallest.currentRecord); + } + } + + private void setMetricsValues(GTRecord record, Object[] metricsValues) { + ImmutableBitSet metrics = getMetricsColumns(record); + + if (reuseMetricsSpace == null) { + reuseMetricsSpace = new ByteArray(record.getInfo().getMaxColumnLength(metrics)); + } + + record.setValues(metrics, reuseMetricsSpace, metricsValues); + } + + private Object[] getMetricsValues(GTRecord record) { + ImmutableBitSet metrics = getMetricsColumns(record); + return record.getValues(metrics, reuseMetricsArray); + } + + private ImmutableBitSet getMetricsColumns(GTRecord record) { + // metrics columns always come after dimension columns + if (lastCuboidColumnCount == record.getInfo().getColumnCount()) + return lastMetricsColumns; + + int to = record.getInfo().getColumnCount(); + int from = to - reuseMetricsArray.length; + lastCuboidColumnCount = record.getInfo().getColumnCount(); + lastMetricsColumns = new ImmutableBitSet(from, to); + return lastMetricsColumns; + } + } + + private static class ResultMergeSlot implements Comparable { + CuboidResult splitResult; + IGTScanner scanner; + Iterator recordIterator; + + long currentCuboidId; + GTRecord currentRecord; + + public ResultMergeSlot(CuboidResult splitResult) { + this.splitResult = splitResult; + } + + public boolean fetchNext() throws IOException { + if (recordIterator == null) { + currentCuboidId = splitResult.cuboidId; + scanner = splitResult.table.scan(new GTScanRequestBuilder().setInfo(splitResult.table.getInfo()) + .setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest()); + recordIterator = scanner.iterator(); + } + + if (recordIterator.hasNext()) { + currentRecord = recordIterator.next(); + return true; + } else { + scanner.close(); + recordIterator = null; + return false; + } + } + + @Override + public int compareTo(ResultMergeSlot o) { + long cuboidComp = this.currentCuboidId - o.currentCuboidId; + if (cuboidComp != 0) + return cuboidComp < 0 ? -1 : 1; + + // note GTRecord.equals() don't work because the two GTRecord comes from different GridTable + ImmutableBitSet pk = this.currentRecord.getInfo().getPrimaryKey(); + for (int i = 0; i < pk.trueBitCount(); i++) { + int c = pk.trueBitAt(i); + int comp = this.currentRecord.get(c).compareTo(o.currentRecord.get(c)); + if (comp != 0) + return comp; + } + return 0; + } + + public boolean isSameKey(ResultMergeSlot o) { + if (o == null) + return false; + else + return this.compareTo(o) == 0; + } + + }; +} \ No newline at end of file diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidCollectorWithCallBack.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidCollectorWithCallBack.java new file mode 100755 index 0000000..b669bbe --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidCollectorWithCallBack.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube.inmemcubing2; + +import java.util.NavigableMap; + +import org.apache.kylin.cube.inmemcubing.CuboidResult; + +public interface ICuboidCollectorWithCallBack { + void collectAndNotify(CuboidResult result); + NavigableMap getAllResult(); +} \ No newline at end of file diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidResultListener.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidResultListener.java new file mode 100755 index 0000000..6d80f00 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidResultListener.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube.inmemcubing2; + +import org.apache.kylin.cube.inmemcubing.CuboidResult; + +public interface ICuboidResultListener { + void finish(CuboidResult cuboidResult); +} \ No newline at end of file diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/InMemCubeBuilder2.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/InMemCubeBuilder2.java new file mode 100755 index 0000000..35a4d09 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/InMemCubeBuilder2.java @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube.inmemcubing2; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.ForkJoinWorkerThread; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.common.util.MemoryBudgetController; +import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.cube.gridtable.CubeGridTable; +import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder; +import org.apache.kylin.cube.inmemcubing.ConcurrentDiskStore; +import org.apache.kylin.cube.inmemcubing.CuboidResult; +import org.apache.kylin.cube.inmemcubing.ICuboidWriter; +import org.apache.kylin.cube.inmemcubing.InMemCubeBuilderUtils; +import org.apache.kylin.cube.inmemcubing.InputConverter; +import org.apache.kylin.cube.inmemcubing.InputConverterUnit; +import org.apache.kylin.cube.inmemcubing.RecordConsumeBlockingQueueController; +import org.apache.kylin.cube.kv.CubeDimEncMap; +import org.apache.kylin.gridtable.GTAggregateScanner; +import org.apache.kylin.gridtable.GTBuilder; +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GTScanRequest; +import org.apache.kylin.gridtable.GTScanRequestBuilder; +import org.apache.kylin.gridtable.GridTable; +import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.gridtable.IGTStore; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; + +/** + * Build a cube (many cuboids) in memory. Calculating multiple cuboids at the same time as long as memory permits. + * Assumes base cuboid fits in memory or otherwise OOM exception will occur. + */ +public class InMemCubeBuilder2 extends AbstractInMemCubeBuilder { + private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder2.class); + + // by experience + private static final double DERIVE_AGGR_CACHE_CONSTANT_FACTOR = 0.1; + private static final double DERIVE_AGGR_CACHE_VARIABLE_FACTOR = 0.9; + + protected final String[] metricsAggrFuncs; + protected final MeasureDesc[] measureDescs; + protected final int measureCount; + + private MemoryBudgetController memBudget; + protected final long baseCuboidId; + private CuboidResult baseResult; + + private Queue completedTaskQueue; + private AtomicInteger taskCuboidCompleted; + + private ICuboidCollectorWithCallBack resultCollector; + + public InMemCubeBuilder2(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc, + Map> dictionaryMap) { + super(cuboidScheduler, flatDesc, dictionaryMap); + this.measureCount = cubeDesc.getMeasures().size(); + this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]); + List metricsAggrFuncsList = Lists.newArrayList(); + + for (int i = 0; i < measureCount; i++) { + MeasureDesc measureDesc = measureDescs[i]; + metricsAggrFuncsList.add(measureDesc.getFunction().getExpression()); + } + this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]); + this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); + } + + public int getBaseResultCacheMB() { + return baseResult.aggrCacheMB; + } + + private GridTable newGridTableByCuboidID(long cuboidID) throws IOException { + GTInfo info = CubeGridTable.newGTInfo(Cuboid.findForMandatory(cubeDesc, cuboidID), + new CubeDimEncMap(cubeDesc, dictionaryMap)); + + // Below several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest. + // MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget); + // MemDiskStore store = new MemDiskStore(info, MemoryBudgetController.ZERO_BUDGET); + IGTStore store = new ConcurrentDiskStore(info); + + GridTable gridTable = new GridTable(info, store); + return gridTable; + } + + @Override + public void build(BlockingQueue input, InputConverterUnit inputConverterUnit, ICuboidWriter output) + throws IOException { + NavigableMap result = buildAndCollect( + RecordConsumeBlockingQueueController.getQueueController(inputConverterUnit, input), null); + try { + for (CuboidResult cuboidResult : result.values()) { + outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output); + cuboidResult.table.close(); + } + } finally { + output.close(); + } + } + + /** + * Build all the cuboids and wait for all the tasks finished. + * + * @param input + * @param listener + * @return + * @throws IOException + */ + private NavigableMap buildAndCollect(final RecordConsumeBlockingQueueController input, + final ICuboidResultListener listener) throws IOException { + + long startTime = System.currentTimeMillis(); + logger.info("In Mem Cube Build2 start, " + cubeDesc.getName()); + + // build base cuboid + buildBaseCuboid(input, listener); + + ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() { + @Override + public ForkJoinWorkerThread newThread(ForkJoinPool pool) { + final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); + worker.setName("inmem-cubing-cuboid-worker-" + worker.getPoolIndex()); + return worker; + } + }; + ForkJoinPool builderPool = new ForkJoinPool(taskThreadCount, factory, null, true); + ForkJoinTask rootTask = builderPool.submit(new Runnable() { + @Override + public void run() { + startBuildFromBaseCuboid(); + } + }); + rootTask.join(); + + long endTime = System.currentTimeMillis(); + logger.info("In Mem Cube Build2 end, " + cubeDesc.getName() + ", takes " + (endTime - startTime) + " ms"); + logger.info("total CuboidResult count:" + resultCollector.getAllResult().size()); + return resultCollector.getAllResult(); + } + + public ICuboidCollectorWithCallBack getResultCollector() { + return resultCollector; + } + + public CuboidResult buildBaseCuboid(RecordConsumeBlockingQueueController input, + final ICuboidResultListener listener) throws IOException { + completedTaskQueue = new LinkedBlockingQueue(); + taskCuboidCompleted = new AtomicInteger(0); + + resultCollector = new DefaultCuboidCollectorWithCallBack(listener); + + MemoryBudgetController.MemoryWaterLevel baseCuboidMemTracker = new MemoryWaterLevel(); + baseCuboidMemTracker.markLow(); + baseResult = createBaseCuboid(input, baseCuboidMemTracker); + + if (baseResult.nRows == 0) { + taskCuboidCompleted.set(cuboidScheduler.getCuboidCount()); + return baseResult; + } + + baseCuboidMemTracker.markLow(); + baseResult.aggrCacheMB = Math.max(baseCuboidMemTracker.getEstimateMB(), 10); // 10 MB at minimal + + makeMemoryBudget(); + return baseResult; + } + + public CuboidResult buildCuboid(CuboidTask task) throws IOException { + CuboidResult newCuboid = buildCuboid(task.parent, task.childCuboidId); + completedTaskQueue.add(task); + addChildTasks(newCuboid); + return newCuboid; + } + + private CuboidResult buildCuboid(CuboidResult parent, long cuboidId) throws IOException { + final String consumerName = "AggrCache@Cuboid " + cuboidId; + MemoryBudgetController.MemoryConsumer consumer = new MemoryBudgetController.MemoryConsumer() { + @Override + public int freeUp(int mb) { + return 0; // cannot free up on demand + } + + @Override + public String toString() { + return consumerName; + } + }; + + // reserve memory for aggregation cache, can't be larger than the parent + memBudget.reserveInsist(consumer, parent.aggrCacheMB); + try { + return aggregateCuboid(parent, cuboidId); + } finally { + memBudget.reserve(consumer, 0); + } + } + + public boolean isAllCuboidDone() { + return taskCuboidCompleted.get() == cuboidScheduler.getCuboidCount(); + } + + public void startBuildFromBaseCuboid() { + addChildTasks(baseResult); + } + + private void addChildTasks(CuboidResult parent) { + List children = cuboidScheduler.getSpanningCuboid(parent.cuboidId); + if (children != null && !children.isEmpty()) { + List childTasks = Lists.newArrayListWithExpectedSize(children.size()); + for (Long child : children) { + CuboidTask task = new CuboidTask(parent, child, this); + childTasks.add(task); + task.fork(); + } + for (CuboidTask childTask : childTasks) { + childTask.join(); + } + } + } + + public Queue getCompletedTaskQueue() { + return completedTaskQueue; + } + + private void makeMemoryBudget() { + int systemAvailMB = MemoryBudgetController.gcAndGetSystemAvailMB(); + logger.info("System avail " + systemAvailMB + " MB"); + int reserve = reserveMemoryMB; + logger.info("Reserve " + reserve + " MB for system basics"); + + int budget = systemAvailMB - reserve; + if (budget < baseResult.aggrCacheMB) { + // make sure we have base aggr cache as minimal + budget = baseResult.aggrCacheMB; + logger.warn("System avail memory (" + systemAvailMB + " MB) is less than base aggr cache (" + + baseResult.aggrCacheMB + " MB) + minimal reservation (" + reserve + + " MB), consider increase JVM heap -Xmx"); + } + + logger.info("Memory Budget is " + budget + " MB"); + memBudget = new MemoryBudgetController(budget); + } + + private CuboidResult createBaseCuboid(RecordConsumeBlockingQueueController input, + MemoryBudgetController.MemoryWaterLevel baseCuboidMemTracker) throws IOException { + logger.info("Calculating base cuboid " + baseCuboidId); + + Stopwatch sw = new Stopwatch(); + sw.start(); + GridTable baseCuboid = newGridTableByCuboidID(baseCuboidId); + GTBuilder baseBuilder = baseCuboid.rebuild(); + IGTScanner baseInput = new InputConverter<>(baseCuboid.getInfo(), input); + + Pair dimensionMetricsBitSet = InMemCubeBuilderUtils + .getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount); + GTScanRequest req = new GTScanRequestBuilder().setInfo(baseCuboid.getInfo()).setRanges(null).setDimensions(null) + .setAggrGroupBy(dimensionMetricsBitSet.getFirst()).setAggrMetrics(dimensionMetricsBitSet.getSecond()) + .setAggrMetricsFuncs(metricsAggrFuncs).setFilterPushDown(null).createGTScanRequest(); + GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req); + aggregationScanner.trackMemoryLevel(baseCuboidMemTracker); + + int count = 0; + for (GTRecord r : aggregationScanner) { + if (count == 0) { + baseCuboidMemTracker.markHigh(); + } + baseBuilder.write(r); + count++; + } + aggregationScanner.close(); + baseBuilder.close(); + + sw.stop(); + logger.info("Cuboid " + baseCuboidId + " has " + count + " rows, build takes " + sw.elapsedMillis() + "ms"); + + int mbEstimateBaseAggrCache = (int) (aggregationScanner.getEstimateSizeOfAggrCache() + / MemoryBudgetController.ONE_MB); + logger.info("Wild estimate of base aggr cache is " + mbEstimateBaseAggrCache + " MB"); + + return updateCuboidResult(baseCuboidId, baseCuboid, count, sw.elapsedMillis(), 0, + input.inputConverterUnit.ifChange()); + } + + private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, + int aggrCacheMB) { + return updateCuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB, true); + } + + private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB, + boolean ifCollect) { + if (aggrCacheMB <= 0 && baseResult != null) { + aggrCacheMB = (int) Math.round( + (DERIVE_AGGR_CACHE_CONSTANT_FACTOR + DERIVE_AGGR_CACHE_VARIABLE_FACTOR * nRows / baseResult.nRows) // + * baseResult.aggrCacheMB); + } + + CuboidResult result = new CuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB); + taskCuboidCompleted.incrementAndGet(); + + if (ifCollect) { + resultCollector.collectAndNotify(result); + } + return result; + } + + protected CuboidResult aggregateCuboid(CuboidResult parent, long cuboidId) throws IOException { + final Pair allNeededColumns = InMemCubeBuilderUtils + .getDimensionAndMetricColumnBitSet(parent.cuboidId, cuboidId, measureCount); + return scanAndAggregateGridTable(parent.table, newGridTableByCuboidID(cuboidId), parent.cuboidId, + cuboidId, allNeededColumns.getFirst(), allNeededColumns.getSecond()); + } + + private GTAggregateScanner prepareGTAggregationScanner(GridTable gridTable, long parentId, long cuboidId, + ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException { + GTInfo info = gridTable.getInfo(); + GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null) + .setAggrGroupBy(aggregationColumns).setAggrMetrics(measureColumns).setAggrMetricsFuncs(metricsAggrFuncs) + .setFilterPushDown(null).createGTScanRequest(); + GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req); + + // for child cuboid, some measures don't need aggregation. + if (parentId != cuboidId) { + boolean[] aggrMask = new boolean[measureDescs.length]; + for (int i = 0; i < measureDescs.length; i++) { + aggrMask[i] = !measureDescs[i].getFunction().getMeasureType().onlyAggrInBaseCuboid(); + + if (!aggrMask[i]) { + logger.info(measureDescs[i].toString() + " doesn't need aggregation."); + } + } + scanner.setAggrMask(aggrMask); + } + + return scanner; + } + + protected CuboidResult scanAndAggregateGridTable(GridTable gridTable, GridTable newGridTable, long parentId, + long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException { + Stopwatch sw = new Stopwatch(); + sw.start(); + logger.info("Calculating cuboid " + cuboidId); + + GTAggregateScanner scanner = prepareGTAggregationScanner(gridTable, parentId, cuboidId, aggregationColumns, + measureColumns); + GTBuilder builder = newGridTable.rebuild(); + + ImmutableBitSet allNeededColumns = aggregationColumns.or(measureColumns); + + GTRecord newRecord = new GTRecord(newGridTable.getInfo()); + int count = 0; + try { + for (GTRecord record : scanner) { + count++; + for (int i = 0; i < allNeededColumns.trueBitCount(); i++) { + int c = allNeededColumns.trueBitAt(i); + newRecord.set(i, record.get(c)); + } + builder.write(newRecord); + } + } finally { + scanner.close(); + builder.close(); + } + sw.stop(); + logger.info("Cuboid " + cuboidId + " has " + count + " rows, build takes " + sw.elapsedMillis() + "ms"); + + return updateCuboidResult(cuboidId, newGridTable, count, sw.elapsedMillis(), 0); + } +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java index 1beebc7..fc6edd3 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java @@ -20,31 +20,21 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.gridtable.CubeGridTable; -import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder; -import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder; +import org.apache.kylin.cube.inmemcubing.ICuboidWriter; import org.apache.kylin.cube.inmemcubing.InputConverterUnit; import org.apache.kylin.cube.inmemcubing.InputConverterUnitForBaseCuboid; import org.apache.kylin.cube.kv.CubeDimEncMap; import org.apache.kylin.engine.mr.ByteArrayWritable; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.gridtable.GTInfo; -import org.apache.kylin.metadata.model.TblColRef; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; public class InMemCuboidFromBaseCuboidMapper extends InMemCuboidMapperBase { @@ -75,16 +65,8 @@ public class InMemCuboidFromBaseCuboidMapper } @Override - protected Future getCubingThreadFuture(Context context, Map> dictionaryMap, - int reserveMemoryMB, CuboidScheduler cuboidScheduler) { - AbstractInMemCubeBuilder cubeBuilder = new DoggedCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap); - cubeBuilder.setReserveMemoryMB(reserveMemoryMB); - cubeBuilder.setConcurrentThreads(taskThreadCount); - - ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("inmemory-cube-building-from-base-cuboid-mapper-%d").build()); - return executorService.submit(cubeBuilder.buildAsRunnable(queue, inputConverterUnit, - new MapContextGTRecordWriter(context, cubeDesc, cubeSegment))); + protected ICuboidWriter getCuboidWriter(Context context) { + return new MapContextGTRecordWriter(context, cubeDesc, cubeSegment); } @Override @@ -98,5 +80,4 @@ public class InMemCuboidFromBaseCuboidMapper return new ByteArray(keyValue); } - } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java index 551a17b..d363afc 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java @@ -19,24 +19,15 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.cube.cuboid.CuboidScheduler; -import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder; -import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder; +import org.apache.kylin.cube.inmemcubing.ICuboidWriter; import org.apache.kylin.cube.inmemcubing.InputConverterUnit; import org.apache.kylin.cube.inmemcubing.InputConverterUnitForRawData; import org.apache.kylin.engine.mr.ByteArrayWritable; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.engine.mr.MRUtil; -import org.apache.kylin.metadata.model.TblColRef; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ThreadFactoryBuilder; public class InMemCuboidMapper extends InMemCuboidMapperBase { @@ -63,15 +54,7 @@ public class InMemCuboidMapper } @Override - protected Future getCubingThreadFuture(Context context, Map> dictionaryMap, - int reserveMemoryMB, CuboidScheduler cuboidScheduler) { - AbstractInMemCubeBuilder cubeBuilder = new DoggedCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap); - cubeBuilder.setReserveMemoryMB(reserveMemoryMB); - cubeBuilder.setConcurrentThreads(taskThreadCount); - - ExecutorService executorService = Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("inmemory-cube-building-mapper-%d").build()); - return executorService.submit(cubeBuilder.buildAsRunnable(queue, inputConverterUnit, - new MapContextGTRecordWriter(context, cubeDesc, cubeSegment))); + protected ICuboidWriter getCuboidWriter(Context context) { + return new MapContextGTRecordWriter(context, cubeDesc, cubeSegment); } } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java index 73af138..f3b238d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java @@ -21,6 +21,8 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -33,7 +35,10 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder; import org.apache.kylin.cube.inmemcubing.ConsumeBlockingQueueController; +import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder; +import org.apache.kylin.cube.inmemcubing.ICuboidWriter; import org.apache.kylin.cube.inmemcubing.InputConverterUnit; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; @@ -48,6 +53,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** */ @@ -75,11 +81,10 @@ public abstract class InMemCuboidMapperBase protected abstract InputConverterUnit getInputConverterUnit(Context context); - protected abstract Future getCubingThreadFuture(Context context, Map> dictionaryMap, - int reserveMemoryMB, CuboidScheduler cuboidScheduler); - protected abstract T getRecordFromKeyValue(KEYIN key, VALUEIN value); + protected abstract ICuboidWriter getCuboidWriter(Context context); + @Override protected void doSetup(Context context) throws IOException { super.bindCurrentConfiguration(context.getConfiguration()); @@ -117,7 +122,24 @@ public abstract class InMemCuboidMapperBase taskThreadCount = config.getCubeAlgorithmInMemConcurrentThreads(); reserveMemoryMB = calculateReserveMB(conf); inputConverterUnit = getInputConverterUnit(context); - future = getCubingThreadFuture(context, dictionaryMap, reserveMemoryMB, cuboidScheduler); + + AbstractInMemCubeBuilder cubeBuilder; + try { + cubeBuilder = (AbstractInMemCubeBuilder) Class.forName(cubeSegment.getConfig().getCubeInMemBuilderClass()) + .getConstructor(CuboidScheduler.class, IJoinedFlatTableDesc.class, Map.class) + .newInstance(cuboidScheduler, flatDesc, dictionaryMap); + } catch (Exception e) { + logger.warn("Fail to initialize cube builder by class name " + + cubeSegment.getConfig().getCubeInMemBuilderClass() + " due to " + e); + cubeBuilder = new DoggedCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap); + } + cubeBuilder.setReserveMemoryMB(reserveMemoryMB); + cubeBuilder.setConcurrentThreads(taskThreadCount); + + ExecutorService executorService = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("inmemory-cube-building-mapper-%d").build()); + future = executorService + .submit(cubeBuilder.buildAsRunnable(queue, inputConverterUnit, getCuboidWriter(context))); } private int calculateReserveMB(Configuration configuration) { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java index 60d0870..43b7ee2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java @@ -25,7 +25,7 @@ import java.nio.ByteBuffer; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.inmemcubing.ICuboidWriter; +import org.apache.kylin.cube.inmemcubing.ICuboidGTTableWriter; import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.ByteArrayWritable; @@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory; /** */ -public abstract class KVGTRecordWriter implements ICuboidWriter { +public abstract class KVGTRecordWriter extends ICuboidGTTableWriter { private static final Logger logger = LoggerFactory.getLogger(KVGTRecordWriter.class); diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java index be3d759..695455b 100644 --- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java @@ -32,6 +32,7 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GridTable; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TblColRef; @@ -101,6 +102,11 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase { } @Override + public void write(long cuboidId, GridTable table) throws IOException { + + } + + @Override public void flush() { } diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java index 8fcf9ed..6cfec84 100644 --- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java @@ -37,8 +37,11 @@ import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.inmemcubing2.DoggedCubeBuilder2; +import org.apache.kylin.cube.inmemcubing2.InMemCubeBuilder2; import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GridTable; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TblColRef; @@ -48,6 +51,8 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Maps; + /** */ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase { @@ -89,10 +94,19 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase { long randSeed = System.currentTimeMillis(); IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor()); + InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getCuboidScheduler(), flatDesc, dictionaryMap); + inmemBuilder.setConcurrentThreads(THREADS); + FileRecordWriter inmemResult = new FileRecordWriter(); + { + Future future = executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult)); + ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed); + future.get(); + inmemResult.close(); + } + DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getCuboidScheduler(), flatDesc, dictionaryMap); doggedBuilder.setConcurrentThreads(THREADS); FileRecordWriter doggedResult = new FileRecordWriter(); - { Future future = executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult)); ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed, SPLIT_ROWS); @@ -100,20 +114,34 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase { doggedResult.close(); } - InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getCuboidScheduler(), flatDesc, dictionaryMap); - inmemBuilder.setConcurrentThreads(THREADS); - FileRecordWriter inmemResult = new FileRecordWriter(); - + InMemCubeBuilder2 inmemBuilder2 = new InMemCubeBuilder2(cube.getCuboidScheduler(), flatDesc, dictionaryMap); + inmemBuilder2.setConcurrentThreads(THREADS); + FileRecordWriter inmemResult2 = new FileRecordWriter(); { - Future future = executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult)); + Future future = executorService.submit(inmemBuilder2.buildAsRunnable(queue, inmemResult2)); ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed); future.get(); - inmemResult.close(); + inmemResult2.close(); } + DoggedCubeBuilder2 doggedBuilder2 = new DoggedCubeBuilder2(cube.getCuboidScheduler(), flatDesc, dictionaryMap); + doggedBuilder2.setConcurrentThreads(THREADS); + FileRecordWriter doggedResult2 = new FileRecordWriter(); + { + Future future = executorService.submit(doggedBuilder2.buildAsRunnable(queue, doggedResult2)); + ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed, SPLIT_ROWS); + future.get(); + doggedResult2.close(); + } + + fileCompare(inmemResult.file, inmemResult2.file); fileCompare(inmemResult.file, doggedResult.file); - doggedResult.file.delete(); + fileCompare2(inmemResult.file, doggedResult2.file); + inmemResult.file.delete(); + inmemResult2.file.delete(); + doggedResult.file.delete(); + doggedResult2.file.delete(); } private void fileCompare(File file, File file2) throws IOException { @@ -133,6 +161,27 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase { r2.close(); } + private void fileCompare2(File file, File file2) throws IOException { + Map content1 = readContents(file); + Map content2 = readContents(file2); + assertEquals(content1, content2); + } + + private Map readContents(File file) throws IOException { + BufferedReader r = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8")); + Map content = Maps.newHashMap(); + String line; + while ((line = r.readLine()) != null) { + Integer cnt = content.get(line); + if (cnt == null) { + cnt = 0; + } + content.put(line, cnt + 1); + } + r.close(); + return content; + } + class FileRecordWriter implements ICuboidWriter { File file; @@ -152,6 +201,14 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase { } @Override + public void write(long cuboidId, GridTable table) throws IOException { + writer.print(cuboidId); + writer.print(", "); + writer.print(table.toString()); + writer.println(); + } + + @Override public void flush() { } diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java index 9bfc18e..1e0a501 100644 --- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java @@ -44,6 +44,7 @@ import org.apache.kylin.dict.DictionaryGenerator; import org.apache.kylin.dict.IterableDictionaryValueEnumerator; import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GridTable; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; @@ -275,6 +276,12 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase { } @Override + public void write(long cuboidId, GridTable table) throws IOException { + if (verbose) + System.out.println(table.toString()); + } + + @Override public void flush() { if (verbose) { System.out.println("flush"); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java index e04e07d..a98f57c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.inmemcubing.ICuboidWriter; +import org.apache.kylin.cube.inmemcubing.ICuboidGTTableWriter; import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.HBaseColumnDesc; @@ -41,7 +41,7 @@ import com.google.common.collect.Lists; /** */ -public class HBaseCuboidWriter implements ICuboidWriter { +public class HBaseCuboidWriter extends ICuboidGTTableWriter { private static final Logger logger = LoggerFactory.getLogger(HBaseCuboidWriter.class); -- 2.5.4 (Apple Git-61)