From e1cb9c7ff0aee2a31c1df1541af59e83a6058cb6 Mon Sep 17 00:00:00 2001 From: gaodayue Date: Wed, 15 Mar 2017 22:45:02 +0800 Subject: [PATCH] KYLIN-2501 Stream Aggregate GTRecords at Query Server --- .../org/apache/kylin/common/KylinConfigBase.java | 4 + .../apache/kylin/common/util/ImmutableBitSet.java | 29 ++- .../java/org/apache/kylin/GTForwardingScanner.java | 56 ++++++ .../apache/kylin/cube/gridtable/CubeGridTable.java | 18 -- .../cube/gridtable/CuboidToGridTableMapping.java | 18 ++ .../kylin/cube/inmemcubing/InMemCubeBuilder.java | 6 +- .../apache/kylin/gridtable/GTAggregateScanner.java | 16 +- .../apache/kylin/gridtable/GTFilterScanner.java | 22 +-- .../java/org/apache/kylin/gridtable/GTRecord.java | 80 +++----- .../org/apache/kylin/gridtable/GTScanRequest.java | 13 ++ .../kylin/gridtable/GTStreamAggregateScanner.java | 219 +++++++++++++++++++++ .../kylin/gridtable/GTScanReqSerDerTest.java | 4 +- .../org/apache/kylin/storage/StorageContext.java | 8 + .../storage/gtrecord/CubeScanRangePlanner.java | 3 +- .../kylin/storage/gtrecord/CubeSegmentScanner.java | 7 +- .../kylin/storage/gtrecord/CubeTupleConverter.java | 31 +-- .../storage/gtrecord/GTCubeStorageQueryBase.java | 31 ++- .../kylin/storage/gtrecord/ITupleConverter.java | 3 +- .../storage/gtrecord/PartitionResultIterator.java | 59 ++++++ .../kylin/storage/gtrecord/ScannerWorker.java | 5 +- .../storage/gtrecord/SegmentCubeTupleIterator.java | 71 ++++++- .../SortMergedPartitionResultIterator.java | 81 ++++++++ .../storage/gtrecord/StorageResponseGTScatter.java | 83 +++----- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 7 +- .../kylin/storage/hbase/cube/v2/CubeHBaseRPC.java | 5 +- .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 5 +- 26 files changed, 675 insertions(+), 209 deletions(-) create mode 100644 core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java create mode 100644 core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java create mode 100644 core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java create mode 100644 core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 02349ad..9cd35c8 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -802,6 +802,10 @@ abstract public class KylinConfigBase implements Serializable { return Boolean.valueOf(getOptional("kylin.query.skip-empty-segments", "true")); } + public boolean isStreamAggregateEnabled() { + return Boolean.parseBoolean(getOptional("kylin.query.stream-aggregate-enabled", "true")); + } + @Deprecated //Limit is good even it's large. This config is meaning less since we already have scan threshold public int getStoragePushDownLimitMax() { return Integer.parseInt(getOptional("kylin.query.max-limit-pushdown", "10000")); diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java b/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java index b417877..5cdf08c 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java @@ -19,8 +19,9 @@ package org.apache.kylin.common.util; import java.nio.ByteBuffer; import java.util.BitSet; +import java.util.Iterator; -public class ImmutableBitSet { +public class ImmutableBitSet implements Iterable { public static final ImmutableBitSet EMPTY = new ImmutableBitSet(new BitSet()); @@ -168,4 +169,30 @@ public class ImmutableBitSet { return new ImmutableBitSet(bitSet); } }; + + /** + * Iterate over the positions of true value. + * @return the iterator + */ + @Override + public Iterator iterator() { + return new Iterator() { + int index = 0; + + @Override + public boolean hasNext() { + return index < arr.length; + } + + @Override + public Integer next() { + return arr[index++]; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } } diff --git a/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java b/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java new file mode 100644 index 0000000..de8c88d --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin; + +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.IGTScanner; + +import java.io.IOException; +import java.util.Iterator; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A {@link IGTScanner} which forwards all its method calls to another scanner. + * + * @see decorator pattern. + */ +public class GTForwardingScanner implements IGTScanner { + protected IGTScanner delegated; + + protected GTForwardingScanner(IGTScanner delegated) { + this.delegated = checkNotNull(delegated, "delegated"); + } + + @Override + public GTInfo getInfo() { + return delegated.getInfo(); + } + + @Override + public void close() throws IOException { + delegated.close(); + } + + @Override + public Iterator iterator() { + return delegated.iterator(); + } +} diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java index 563cf43..5cee9df 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java @@ -18,29 +18,11 @@ package org.apache.kylin.cube.gridtable; -import java.util.Map; - -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.kv.CubeDimEncMap; -import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.dimension.IDimensionEncodingMap; import org.apache.kylin.gridtable.GTInfo; -import org.apache.kylin.metadata.model.TblColRef; public class CubeGridTable { - - public static GTInfo newGTInfo(CubeSegment cubeSeg, long cuboidId) { - Cuboid cuboid = Cuboid.findById(cubeSeg.getCubeDesc(), cuboidId); - return newGTInfo(cuboid, new CubeDimEncMap(cubeSeg)); - } - - public static GTInfo newGTInfo(CubeDesc cubeDesc, long cuboidId, Map> dictionaryMap) { - Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId); - return newGTInfo(cuboid, new CubeDimEncMap(cubeDesc, dictionaryMap)); - } - public static GTInfo newGTInfo(Cuboid cuboid, IDimensionEncodingMap dimEncMap) { CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(cuboid); diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java index 2e5dd12..6879687 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java @@ -140,11 +140,29 @@ public class CuboidToGridTableMapping { return i == null ? -1 : i.intValue(); } + public int[] getDimIndexes(Collection dims) { + int[] result = new int[dims.size()]; + int i = 0; + for (TblColRef dim : dims) { + result[i++] = getIndexOf(dim); + } + return result; + } + public int getIndexOf(FunctionDesc metric) { Integer r = metrics2gt.get(metric); return r == null ? -1 : r; } + public int[] getMetricsIndexes(Collection metrics) { + int[] result = new int[metrics.size()]; + int i = 0; + for (FunctionDesc metric : metrics) { + result[i++] = getIndexOf(metric); + } + return result; + } + public List getCuboidDimensionsInGTOrder() { return cuboid.getColumns(); } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java index e08844e..a26e948 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java @@ -38,6 +38,7 @@ import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.gridtable.CubeGridTable; +import org.apache.kylin.cube.kv.CubeDimEncMap; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.gridtable.GTAggregateScanner; import org.apache.kylin.gridtable.GTBuilder; @@ -108,7 +109,10 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { } private GridTable newGridTableByCuboidID(long cuboidID) throws IOException { - GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap); + GTInfo info = CubeGridTable.newGTInfo( + Cuboid.findById(cubeDesc, cuboidID), + new CubeDimEncMap(cubeDesc, dictionaryMap) + ); // Below several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest. // MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget); diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index 7cdd4f5..0dd6fa9 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -45,7 +45,6 @@ import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.measure.MeasureAggregators; -import org.apache.kylin.metadata.datatype.DataType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +62,7 @@ public class GTAggregateScanner implements IGTScanner { final ImmutableBitSet metrics; final String[] metricsAggrFuncs; final IGTScanner inputScanner; + final BufferedMeasureCodec measureCodec; final AggregationCache aggrCache; final long spillThreshold; // 0 means no memory control && no spill final int storagePushDownLimit;//default to be Int.MAX @@ -86,6 +86,7 @@ public class GTAggregateScanner implements IGTScanner { this.metrics = req.getAggrMetrics(); this.metricsAggrFuncs = req.getAggrMetricsFuncs(); this.inputScanner = inputScanner; + this.measureCodec = req.createMeasureCodec(); this.aggrCache = new AggregationCache(); this.spillThreshold = (long) (req.getAggCacheMemThreshold() * MemoryBudgetController.ONE_GB); this.aggrMask = new boolean[metricsAggrFuncs.length]; @@ -175,7 +176,6 @@ public class GTAggregateScanner implements IGTScanner { final int keyLength; final boolean[] compareMask; boolean compareAll = true; - final BufferedMeasureCodec measureCodec; final Comparator bytesComparator = new Comparator() { @Override @@ -213,18 +213,6 @@ public class GTAggregateScanner implements IGTScanner { keyLength = compareMask.length; dumps = Lists.newArrayList(); aggBufMap = createBuffMap(); - measureCodec = createMeasureCodec(); - } - - private BufferedMeasureCodec createMeasureCodec() { - DataType[] types = new DataType[metrics.trueBitCount()]; - for (int i = 0; i < types.length; i++) { - types[i] = info.getColumnType(metrics.trueBitAt(i)); - } - - BufferedMeasureCodec result = new BufferedMeasureCodec(types); - result.setBufferSize(info.getMaxColumnLength(metrics)); - return result; } private boolean[] createCompareMask() { diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java index 717f89c..cad0a04 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Set; +import org.apache.kylin.GTForwardingScanner; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ImmutableBitSet; @@ -33,17 +34,16 @@ import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.tuple.IEvaluatableTuple; -public class GTFilterScanner implements IGTScanner { +public class GTFilterScanner extends GTForwardingScanner { - final private IGTScanner inputScanner; final private TupleFilter filter; final private IFilterCodeSystem filterCodeSystem; final private IEvaluatableTuple oneTuple; // avoid instance creation private GTRecord next = null; - public GTFilterScanner(IGTScanner inputScanner, GTScanRequest req) throws IOException { - this.inputScanner = inputScanner; + public GTFilterScanner(IGTScanner delegated, GTScanRequest req) throws IOException { + super(delegated); this.filter = req.getFilterPushDown(); this.filterCodeSystem = GTUtil.wrap(getInfo().codeSystem.getComparator()); this.oneTuple = new IEvaluatableTuple() { @@ -53,25 +53,15 @@ public class GTFilterScanner implements IGTScanner { } }; - if (TupleFilter.isEvaluableRecursively(filter) == false) + if (!TupleFilter.isEvaluableRecursively(filter)) throw new IllegalArgumentException(); } @Override - public GTInfo getInfo() { - return inputScanner.getInfo(); - } - - @Override - public void close() throws IOException { - inputScanner.close(); - } - - @Override public Iterator iterator() { return new Iterator() { - private Iterator inputIterator = inputScanner.iterator(); + private Iterator inputIterator = delegated.iterator(); private FilterResultCache resultCache = new FilterResultCache(getInfo(), filter); @Override diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java index f4480c8..3397adc 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java @@ -21,7 +21,6 @@ package org.apache.kylin.gridtable; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Comparator; -import java.util.List; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.ImmutableBitSet; @@ -46,18 +45,21 @@ public class GTRecord implements Comparable, Cloneable { } this.info = info; } - - public GTRecord(GTRecord other) { - this.info = other.info; - this.cols = new ByteArray[info.getColumnCount()]; - for (int i = 0; i < other.cols.length; i++) { - this.cols[i] = other.cols[i].copy(); + + @Override + public GTRecord clone() { // deep copy + ByteArray[] cols = new ByteArray[this.cols.length]; + for (int i = 0; i < cols.length; i++) { + cols[i] = this.cols[i].copy(); } + return new GTRecord(this.info, cols); } - @Override - public Object clone() { - return new GTRecord(this); + public void shallowCopyFrom(GTRecord source) { + assert info == source.info; + for (int i = 0; i < cols.length; i++) { + cols[i].set(source.cols[i]); + } } public GTInfo getInfo() { @@ -106,30 +108,18 @@ public class GTRecord implements Comparable, Cloneable { /** decode and return the values of this record */ public Object[] getValues(ImmutableBitSet selectedCols, Object[] result) { assert selectedCols.cardinality() == result.length; - for (int i = 0; i < selectedCols.trueBitCount(); i++) { - int c = selectedCols.trueBitAt(i); - if (cols[c] == null || cols[c].array() == null) { - result[i] = null; - } else { - result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer()); - } + result[i] = decodeValue(selectedCols.trueBitAt(i)); } return result; } - /** decode and return the values of this record */ - public Object[] getValues(int[] selectedColumns, Object[] result) { - assert selectedColumns.length <= result.length; - for (int i = 0; i < selectedColumns.length; i++) { - int c = selectedColumns[i]; - if (cols[c].array() == null) { - result[i] = null; - } else { - result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer()); - } + public Object decodeValue(int c) { + ByteArray col = cols[c]; + if (col != null && col.array() != null) { + return info.codeSystem.decodeColumnValue(c, col.asBuffer()); } - return result; + return null; } public int sizeOf(ImmutableBitSet selectedCols) { @@ -198,19 +188,13 @@ public class GTRecord implements Comparable, Cloneable { return compareToInternal(o, info.colAll); } - public int compareToOnPrimaryKey(GTRecord o) { - return compareToInternal(o, info.primaryKey); - } - - public static Comparator getPrimaryKeyComparator() { + public static Comparator getComparator(final ImmutableBitSet participateCols) { return new Comparator() { - @Override public int compare(GTRecord o1, GTRecord o2) { if (o1 == null || o2 == null) { throw new IllegalStateException("Cannot handle null"); } - - return o1.compareToOnPrimaryKey(o2); + return o1.compareToInternal(o2, participateCols); } }; } @@ -287,26 +271,14 @@ public class GTRecord implements Comparable, Cloneable { loadColumns(info.colBlocks[c], buf); } - /** change pointers to point to data in given buffer, UNLIKE deserialize */ - public void loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf) { - int pos = buf.position(); - for (int i = 0; i < selectedCols.trueBitCount(); i++) { - int c = selectedCols.trueBitAt(i); - int len = info.codeSystem.codeLength(c, buf); - cols[c].set(buf.array(), buf.arrayOffset() + pos, len); - pos += len; - buf.position(pos); - } - } - - /** change pointers to point to data in given buffer, UNLIKE deserialize - * unlike loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf), this - * method allows to defined specific columns(in order) to load + /** + * Change pointers to point to data in given buffer, UNLIKE deserialize + * @param selectedCols positions of column to load + * @param buf buffer containing continuous data of selected columns */ - public void loadColumns(List selectedCols, ByteBuffer buf) { + public void loadColumns(Iterable selectedCols, ByteBuffer buf) { int pos = buf.position(); - for (int i = 0; i < selectedCols.size(); i++) { - int c = selectedCols.get(i); + for (int c : selectedCols) { int len = info.codeSystem.codeLength(c, buf); cols[c].set(buf.array(), buf.arrayOffset() + pos, len); pos += len; diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java index 4629c8e..ae35d2b 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java @@ -31,6 +31,8 @@ import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.SerializeToByteBuffer; +import org.apache.kylin.measure.BufferedMeasureCodec; +import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; @@ -202,6 +204,17 @@ public class GTScanRequest { } + public BufferedMeasureCodec createMeasureCodec() { + DataType[] metricTypes = new DataType[aggrMetrics.trueBitCount()]; + for (int i = 0; i < metricTypes.length; i++) { + metricTypes[i] = info.getColumnType(aggrMetrics.trueBitAt(i)); + } + + BufferedMeasureCodec codec = new BufferedMeasureCodec(metricTypes); + codec.setBufferSize(info.getMaxColumnLength(aggrMetrics)); + return codec; + } + public boolean isDoingStorageAggregation() { return doingStorageAggregation; } diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java new file mode 100644 index 0000000..4eb5791 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.gridtable; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; +import org.apache.kylin.GTForwardingScanner; +import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.measure.BufferedMeasureCodec; +import org.apache.kylin.measure.MeasureAggregator; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * GTStreamAggregateScanner requires input records to be sorted on group fields. + * In such cases, it's superior to hash/sort based aggregator because it can produce + * ordered outputs on the fly and the memory consumption is very low. + */ +public class GTStreamAggregateScanner extends GTForwardingScanner { + private final GTScanRequest req; + private final Comparator keyComparator; + + public GTStreamAggregateScanner(IGTScanner delegated, GTScanRequest scanRequest) { + super(delegated); + this.req = Preconditions.checkNotNull(scanRequest, "scanRequest"); + this.keyComparator = GTRecord.getComparator(scanRequest.getAggrGroupBy()); + } + + @Override + public Iterator iterator() { + return new StreamMergeGTRecordIterator(delegated.iterator()); + } + + public Iterator valuesIterator(int[] gtDimsIdx, int[] gtMetricsIdx) { + return new StreamMergeValuesIterator(delegated.iterator(), gtDimsIdx, gtMetricsIdx); + } + + private abstract class AbstractStreamMergeIterator implements Iterator { + final PeekingIterator input; + final IGTCodeSystem codeSystem; + final ImmutableBitSet dimensions; + final ImmutableBitSet metrics; + final String[] metricFuncs; + final BufferedMeasureCodec measureCodec; + + private final GTRecord first; // reuse to avoid object creation + + AbstractStreamMergeIterator(Iterator input) { + this.input = Iterators.peekingIterator(input); + this.codeSystem = req.getInfo().getCodeSystem(); + this.dimensions = req.getDimensions(); + this.metrics = req.getAggrMetrics(); + this.metricFuncs = req.getAggrMetricsFuncs(); + this.measureCodec = req.createMeasureCodec(); + + this.first = new GTRecord(req.getInfo()); + } + + @Override + public boolean hasNext() { + return input.hasNext(); + } + + private boolean isSameKey(GTRecord o1, GTRecord o2) { + return keyComparator.compare(o1, o2) == 0; + } + + private boolean shouldMergeNext(GTRecord current) { + return input.hasNext() && isSameKey(current, input.peek()); + } + + protected abstract E finalizeResult(GTRecord record); + + protected abstract E finalizeResult(GTRecord record, Object[] aggStates); + + @Override + public E next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + // WATCH OUT! record returned by "input" scanner could be changed later, + // so we must make a shallow copy of it. + first.shallowCopyFrom(input.next()); + + // shortcut to avoid extra deserialize/serialize cost + if (!shouldMergeNext(first)) { + return finalizeResult(first); + } + // merge records with the same key + MeasureAggregator[] aggrs = codeSystem.newMetricsAggregators(metrics, metricFuncs); + aggregate(aggrs, first); + aggregate(aggrs, input.next()); // no need to copy record because it's not referred to later + while (shouldMergeNext(first)) { + aggregate(aggrs, input.next()); + } + + Object[] aggStates = new Object[aggrs.length]; + for (int i = 0; i < aggStates.length; i++) { + aggStates[i] = aggrs[i].getState(); + } + return finalizeResult(first, aggStates); + } + + @SuppressWarnings("unchecked") + protected void aggregate(MeasureAggregator[] aggregators, GTRecord record) { + for (int i = 0; i < aggregators.length; i++) { + int c = metrics.trueBitAt(i); + Object metric = codeSystem.decodeColumnValue(c, record.cols[c].asBuffer()); + aggregators[i].aggregate(metric); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove"); + } + } + + private class StreamMergeGTRecordIterator extends AbstractStreamMergeIterator { + + private GTRecord returnRecord; // avoid object creation + + StreamMergeGTRecordIterator(Iterator input) { + super(input); + this.returnRecord = new GTRecord(req.getInfo()); + } + + @Override + protected GTRecord finalizeResult(GTRecord record) { + return record; + } + + @Override + protected GTRecord finalizeResult(GTRecord record, Object[] aggStates) { + // 1. load dimensions + for (int c : dimensions) { + returnRecord.cols[c] = record.cols[c]; + } + // 2. serialize metrics + byte[] bytes = measureCodec.encode(aggStates).array(); + int[] sizes = measureCodec.getMeasureSizes(); + // 3. load metrics + int offset = 0; + for (int i = 0; i < metrics.trueBitCount(); i++) { + int c = metrics.trueBitAt(i); + returnRecord.cols[c].set(bytes, offset, sizes[i]); + offset += sizes[i]; + } + return returnRecord; + } + } + + private class StreamMergeValuesIterator extends AbstractStreamMergeIterator { + + private int[] gtDimsIdx; + private int[] gtMetricsIdx; // specify which metric to return and their order + private int[] aggIdx; // specify the ith returning metric's aggStates index + + private Object[] result; // avoid object creation + + StreamMergeValuesIterator(Iterator input, int[] gtDimsIdx, int[] gtMetricsIdx) { + super(input); + this.gtDimsIdx = gtDimsIdx; + this.gtMetricsIdx = gtMetricsIdx; + this.aggIdx = new int[gtMetricsIdx.length]; + for (int i = 0; i < aggIdx.length; i++) { + int metricIdx = gtMetricsIdx[i]; + aggIdx[i] = metrics.trueBitIndexOf(metricIdx); + } + + this.result = new Object[gtDimsIdx.length + gtMetricsIdx.length]; + } + + private void decodeAndSetDimensions(GTRecord record) { + for (int i = 0; i < gtDimsIdx.length; i++) { + result[i] = record.decodeValue(gtDimsIdx[i]); + } + } + + @Override + protected Object[] finalizeResult(GTRecord record) { + decodeAndSetDimensions(record); + // decode metrics + for (int i = 0; i < gtMetricsIdx.length; i++) { + result[gtDimsIdx.length + i] = record.decodeValue(gtMetricsIdx[i]); + } + return result; + } + + @Override + protected Object[] finalizeResult(GTRecord record, Object[] aggStates) { + decodeAndSetDimensions(record); + // set metrics + for (int i = 0; i < aggIdx.length; i++) { + result[gtDimsIdx.length + i] = aggStates[aggIdx[i]]; + } + return result; + } + } +} diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java index 77cc2d8..1ae229a 100644 --- a/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java @@ -29,6 +29,7 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.gridtable.CubeGridTable; +import org.apache.kylin.cube.kv.CubeDimEncMap; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -90,7 +91,8 @@ public class GTScanReqSerDerTest extends LocalFileMetadataTestCase { CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube("test_kylin_cube_with_slr_ready"); CubeSegment segment = cube.getFirstSegment(); - GTInfo info = CubeGridTable.newGTInfo(segment, Cuboid.getBaseCuboidId(cube.getDescriptor())); + Cuboid baseCuboid = Cuboid.getBaseCuboid(cube.getDescriptor()); + GTInfo info = CubeGridTable.newGTInfo(baseCuboid, new CubeDimEncMap(segment)); GTInfo.serializer.serialize(info, buffer); buffer.flip(); diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java index 998f1db..4522261 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java @@ -47,6 +47,7 @@ public class StorageContext { private boolean exactAggregation = false; private boolean needStorageAggregation = false; private boolean enableCoprocessor = false; + private boolean enableStreamAggregate = false; private IStorageQuery storageQuery; private AtomicLong processedRowCount = new AtomicLong(); @@ -230,4 +231,11 @@ public class StorageContext { this.storageQuery = storageQuery; } + public boolean isStreamAggregateEnabled() { + return enableStreamAggregate; + } + + public void enableStreamAggregate() { + this.enableStreamAggregate = true; + } } diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java index 6911827..c3cc858 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java @@ -38,6 +38,7 @@ import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; import org.apache.kylin.cube.gridtable.RecordComparators; import org.apache.kylin.cube.gridtable.ScanRangePlannerBase; import org.apache.kylin.cube.gridtable.SegmentGTStartAndEnd; +import org.apache.kylin.cube.kv.CubeDimEncMap; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; @@ -85,7 +86,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase { Set filterDims = Sets.newHashSet(); TupleFilter.collectColumns(filter, filterDims); - this.gtInfo = CubeGridTable.newGTInfo(cubeSegment, cuboid.getId()); + this.gtInfo = CubeGridTable.newGTInfo(cuboid, new CubeDimEncMap(cubeSegment)); CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping(); IGTComparator comp = gtInfo.getCodeSystem().getComparator(); diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java index 4f206d4..31a9f99 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java @@ -78,7 +78,7 @@ public class CubeSegmentScanner implements IGTScanner { } scanRequest = scanRangePlanner.planScanRequest(); String gtStorage = ((GTCubeStorageQueryBase) context.getStorageQuery()).getGTStorage(); - scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage); + scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage, context); } @Override @@ -96,8 +96,7 @@ public class CubeSegmentScanner implements IGTScanner { return scanRequest == null ? null : scanRequest.getInfo(); } - public CubeSegment getSegment() { - return this.cubeSeg; + public GTScanRequest getScanRequest() { + return scanRequest; } - } diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java index 280718f..b762e5c 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java @@ -28,10 +28,8 @@ import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; import org.apache.kylin.dict.lookup.LookupStringTable; -import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.measure.MeasureType; import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller; import org.apache.kylin.metadata.model.FunctionDesc; @@ -43,7 +41,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; /** - * convert GTRecord to tuple + * Convert Object[] (decoded GTRecord) to tuple */ public class CubeTupleConverter implements ITupleConverter { @@ -54,7 +52,6 @@ public class CubeTupleConverter implements ITupleConverter { private final int[] gtColIdx; private final int[] tupleIdx; - private final Object[] gtValues; private final MeasureType[] measureTypes; private final List advMeasureFillers; @@ -63,19 +60,16 @@ public class CubeTupleConverter implements ITupleConverter { private final int nSelectedDims; public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, // - Set selectedDimensions, Set selectedMetrics, TupleInfo returnTupleInfo) { + Set selectedDimensions, Set selectedMetrics, int[] gtColIdx, + TupleInfo returnTupleInfo) { this.cubeSeg = cubeSeg; this.cuboid = cuboid; + this.gtColIdx = gtColIdx; this.tupleInfo = returnTupleInfo; this.derivedColFillers = Lists.newArrayList(); - List cuboidDims = cuboid.getColumns(); - CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping(); - nSelectedDims = selectedDimensions.size(); - gtColIdx = new int[selectedDimensions.size() + selectedMetrics.size()]; tupleIdx = new int[selectedDimensions.size() + selectedMetrics.size()]; - gtValues = new Object[selectedDimensions.size() + selectedMetrics.size()]; // measure types don't have this many, but aligned length make programming easier measureTypes = new MeasureType[selectedDimensions.size() + selectedMetrics.size()]; @@ -89,21 +83,11 @@ public class CubeTupleConverter implements ITupleConverter { // pre-calculate dimension index mapping to tuple for (TblColRef dim : selectedDimensions) { - int dimIndex = mapping.getIndexOf(dim); - gtColIdx[i] = dimIndex; tupleIdx[i] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1; - - // if (tupleIdx[iii] == -1) { - // throw new IllegalStateException("dim not used in tuple:" + dim); - // } - i++; } for (FunctionDesc metric : selectedMetrics) { - int metricIndex = mapping.getIndexOf(metric); - gtColIdx[i] = metricIndex; - if (metric.needRewrite()) { String rewriteFieldName = metric.getRewriteFieldName(); tupleIdx[i] = tupleInfo.hasField(rewriteFieldName) ? tupleInfo.getFieldIndex(rewriteFieldName) : -1; @@ -126,7 +110,7 @@ public class CubeTupleConverter implements ITupleConverter { } // prepare derived columns and filler - Map, List> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(cuboidDims, null); + Map, List> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(cuboid.getColumns(), null); for (Entry, List> entry : hostToDerivedInfo.entrySet()) { TblColRef[] hostCols = entry.getKey().data; for (DeriveInfo deriveInfo : entry.getValue()) { @@ -148,9 +132,8 @@ public class CubeTupleConverter implements ITupleConverter { } @Override - public List translateResult(GTRecord record, Tuple tuple) { - - record.getValues(gtColIdx, gtValues); + public List translateResult(Object[] gtValues, Tuple tuple) { + assert gtValues.length == gtColIdx.length; // dimensions for (int i = 0; i < nSelectedDims; i++) { diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index ecf1ad3..d91a0b4 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -120,6 +120,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { // set limit push down enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filter, loosenedColumnD, sqlDigest.aggregations, context); + // set whether to aggregate results from multiple partitions + enableStreamAggregateIfBeneficial(cuboid, groupsD, context); // set query deadline context.setDeadline(cubeInstance); @@ -144,8 +146,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { protected abstract String getGTStorage(); - protected ITupleConverter newCubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, Set selectedDimensions, Set selectedMetrics, TupleInfo tupleInfo) { - return new CubeTupleConverter(cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo); + protected ITupleConverter newCubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, Set selectedDimensions, Set selectedMetrics, int[] gtColIdx, TupleInfo tupleInfo) { + return new CubeTupleConverter(cubeSeg, cuboid, selectedDimensions, selectedMetrics, gtColIdx, tupleInfo); } protected void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection dimensions, Collection metrics) { @@ -366,6 +368,31 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { } } + private void enableStreamAggregateIfBeneficial(Cuboid cuboid, Set groupsD, StorageContext context) { + CubeDesc cubeDesc = cuboid.getCubeDesc(); + boolean enabled = cubeDesc.getConfig().isStreamAggregateEnabled(); + + Set shardByInGroups = Sets.newHashSet(); + for (TblColRef col : cubeDesc.getShardByColumns()) { + if (groupsD.contains(col)) { + shardByInGroups.add(col); + } + } + if (!shardByInGroups.isEmpty()) { + enabled = false; + logger.debug("Aggregate partition results is not beneficial because shard by columns in groupD: " + shardByInGroups); + } + + if (!context.isNeedStorageAggregation()) { + enabled = false; + logger.debug("Aggregate partition results is not beneficial because no storage aggregation"); + } + + if (enabled) { + context.enableStreamAggregate(); + } + } + protected void notifyBeforeStorageQuery(SQLDigest sqlDigest) { Map> map = Maps.newHashMap(); for (MeasureDesc measure : cubeDesc.getMeasures()) { diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java index 9c50d0c..dd48e4d 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java @@ -20,11 +20,10 @@ package org.apache.kylin.storage.gtrecord; import java.util.List; -import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller; import org.apache.kylin.metadata.tuple.Tuple; public interface ITupleConverter { - public List translateResult(GTRecord record, Tuple tuple); + public List translateResult(Object[] gtValues, Tuple tuple); } diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java new file mode 100644 index 0000000..474e1e0 --- /dev/null +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import com.google.common.collect.UnmodifiableIterator; +import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; + +import java.nio.ByteBuffer; +import java.util.NoSuchElementException; + +/** + * Support iterate over {@code GTRecord}s in storage partition result. + * + *

Note that the implementation returns the same object for next(). + * Client needs to copy the returned record when needed. + */ +public class PartitionResultIterator extends UnmodifiableIterator { + private final ByteBuffer buffer; + private final ImmutableBitSet cols; + private final GTRecord record; // reuse to avoid object creation + + public PartitionResultIterator(byte[] data, GTInfo info, ImmutableBitSet cols) { + this.buffer = ByteBuffer.wrap(data); + this.cols = cols; + this.record = new GTRecord(info); + } + + @Override + public boolean hasNext() { + return buffer.hasRemaining(); + } + + @Override + public GTRecord next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + record.loadColumns(cols, buffer); + return record; + } +} diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java index 9e89227..fe22e9c 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java @@ -30,6 +30,7 @@ import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.gridtable.IGTStorage; import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.storage.StorageContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +39,7 @@ public class ScannerWorker { private static final Logger logger = LoggerFactory.getLogger(ScannerWorker.class); private IGTScanner internal = null; - public ScannerWorker(ISegment segment, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage) { + public ScannerWorker(ISegment segment, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage, StorageContext context) { if (scanRequest == null) { logger.info("Segment {} will be skipped", segment); internal = new EmptyGTScanner(); @@ -48,7 +49,7 @@ public class ScannerWorker { final GTInfo info = scanRequest.getInfo(); try { - IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(ISegment.class, Cuboid.class, GTInfo.class).newInstance(segment, cuboid, info); // default behavior + IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(ISegment.class, Cuboid.class, GTInfo.class, StorageContext.class).newInstance(segment, cuboid, info, context); // default behavior internal = rpc.getGTScanner(scanRequest); } catch (IOException | InstantiationException | InvocationTargetException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException e) { throw new RuntimeException(e); diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java index 37699a3..3bac5ec 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java @@ -24,8 +24,14 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Set; +import com.google.common.collect.UnmodifiableIterator; import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; +import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GTScanRequest; +import org.apache.kylin.gridtable.GTStreamAggregateScanner; +import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.TblColRef; @@ -49,7 +55,7 @@ public class SegmentCubeTupleIterator implements ITupleIterator { protected final Tuple tuple; protected final StorageContext context; - protected Iterator gtItr; + protected Iterator gtValues; protected ITupleConverter cubeTupleConverter; protected Tuple next; @@ -66,12 +72,61 @@ public class SegmentCubeTupleIterator implements ITupleIterator { this.tupleInfo = returnTupleInfo; this.tuple = new Tuple(returnTupleInfo); this.context = context; - this.gtItr = getGTItr(scanner); - this.cubeTupleConverter = ((GTCubeStorageQueryBase) context.getStorageQuery()).newCubeTupleConverter(scanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo); + + CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping(); + int[] gtDimsIdx = mapping.getDimIndexes(selectedDimensions); + int[] gtMetricsIdx = mapping.getMetricsIndexes(selectedMetrics); + // gtColIdx = gtDimsIdx + gtMetricsIdx + int[] gtColIdx = new int[gtDimsIdx.length + gtMetricsIdx.length]; + System.arraycopy(gtDimsIdx, 0, gtColIdx, 0, gtDimsIdx.length); + System.arraycopy(gtMetricsIdx, 0, gtColIdx, gtDimsIdx.length, gtMetricsIdx.length); + + this.gtValues = getGTValuesIterator(scanner.iterator(), scanner.getScanRequest(), gtDimsIdx, gtMetricsIdx); + this.cubeTupleConverter = ((GTCubeStorageQueryBase) context.getStorageQuery()).newCubeTupleConverter( + scanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, gtColIdx, tupleInfo); } - private Iterator getGTItr(CubeSegmentScanner scanner) { - return scanner.iterator(); + private Iterator getGTValuesIterator( + final Iterator records, final GTScanRequest scanRequest, + final int[] gtDimsIdx, final int[] gtMetricsIdx) { + + boolean hasMultiplePartitions = records instanceof SortMergedPartitionResultIterator; + if (hasMultiplePartitions && context.isStreamAggregateEnabled()) { + // input records are ordered, leverage stream aggregator to produce possibly fewer records + IGTScanner inputScanner = new IGTScanner() { + public GTInfo getInfo() { + return scanRequest.getInfo(); + } + + public void close() throws IOException {} + + public Iterator iterator() { + return records; + } + }; + GTStreamAggregateScanner aggregator = new GTStreamAggregateScanner(inputScanner, scanRequest); + return aggregator.valuesIterator(gtDimsIdx, gtMetricsIdx); + } + + // simply decode records + return new UnmodifiableIterator() { + Object[] result = new Object[gtDimsIdx.length + gtMetricsIdx.length]; + + public boolean hasNext() { + return records.hasNext(); + } + + public Object[] next() { + GTRecord record = records.next(); + for (int i = 0; i < gtDimsIdx.length; i++) { + result[i] = record.decodeValue(gtDimsIdx[i]); + } + for (int i = 0; i < gtMetricsIdx.length; i++) { + result[gtDimsIdx.length + i] = record.decodeValue(gtMetricsIdx[i]); + } + return result; + } + }; } @Override @@ -91,13 +146,13 @@ public class SegmentCubeTupleIterator implements ITupleIterator { } // now we have a GTRecord - if (!gtItr.hasNext()) { + if (!gtValues.hasNext()) { return false; } - GTRecord curRecord = gtItr.next(); + Object[] gtValues = this.gtValues.next(); // translate into tuple - advMeasureFillers = cubeTupleConverter.translateResult(curRecord, tuple); + advMeasureFillers = cubeTupleConverter.translateResult(gtValues, tuple); // the simple case if (advMeasureFillers == null) { diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java new file mode 100644 index 0000000..21e61e3 --- /dev/null +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; +import com.google.common.collect.UnmodifiableIterator; +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; + +import java.util.Comparator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.PriorityQueue; + +/** + * Merge-sort {@code GTRecord}s in all partitions, assume each partition contains sorted elements. + */ +public class SortMergedPartitionResultIterator extends UnmodifiableIterator { + + final GTRecord record ; // reuse to avoid object creation + PriorityQueue> heap; + + SortMergedPartitionResultIterator( + List partitionResults, + GTInfo info, final Comparator comparator) { + + this.record = new GTRecord(info); + Comparator> heapComparator = new Comparator>() { + public int compare(PeekingIterator o1, PeekingIterator o2) { + return comparator.compare(o1.peek(), o2.peek()); + } + }; + this.heap = new PriorityQueue<>(partitionResults.size(), heapComparator); + + for (PartitionResultIterator it : partitionResults) { + if (it.hasNext()) { + heap.offer(Iterators.peekingIterator(it)); + } + } + } + + @Override + public boolean hasNext() { + return !heap.isEmpty(); + } + + @Override + public GTRecord next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + // get smallest record + PeekingIterator it = heap.poll(); + // WATCH OUT! record got from PartitionResultIterator.next() may changed later, + // so we must make a shallow copy of it. + record.shallowCopyFrom(it.next()); + + if (it.hasNext()) { + heap.offer(it); + } + + return record; + } +} diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java index 1a80bbf..f1ab20c 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java @@ -18,22 +18,20 @@ package org.apache.kylin.storage.gtrecord; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Iterator; - -import javax.annotation.Nullable; - +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.storage.StorageContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; -import com.google.common.collect.Iterators; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; /** * scatter the blob returned from region server to a iterable of gtrecords @@ -42,18 +40,20 @@ public class StorageResponseGTScatter implements IGTScanner { private static final Logger logger = LoggerFactory.getLogger(StorageResponseGTScatter.class); - private GTInfo info; + private final GTInfo info; private IPartitionStreamer partitionStreamer; - private Iterator blocks; - private ImmutableBitSet columns; - private int storagePushDownLimit = -1; + private final Iterator blocks; + private final ImmutableBitSet columns; + private final ImmutableBitSet groupByDims; + private final boolean needSorted; // whether scanner should return sorted records - public StorageResponseGTScatter(GTInfo info, IPartitionStreamer partitionStreamer, ImmutableBitSet columns, int storagePushDownLimit) { - this.info = info; + public StorageResponseGTScatter(GTScanRequest scanRequest, IPartitionStreamer partitionStreamer, StorageContext context) { + this.info = scanRequest.getInfo(); this.partitionStreamer = partitionStreamer; this.blocks = partitionStreamer.asByteArrayIterator(); - this.columns = columns; - this.storagePushDownLimit = storagePushDownLimit; + this.columns = scanRequest.getColumns(); + this.groupByDims = scanRequest.getAggrGroupBy(); + this.needSorted = (context.getFinalPushDownLimit() != Integer.MAX_VALUE) || context.isStreamAggregateEnabled(); } @Override @@ -69,48 +69,21 @@ public class StorageResponseGTScatter implements IGTScanner { @Override public Iterator iterator() { - Iterator> shardSubsets = Iterators.transform(blocks, new EndpointResponseGTScatterFunc()); - if (StorageContext.mergeSortPartitionResults(storagePushDownLimit)) { - logger.info("Using SortedIteratorMergerWithLimit to merge partition results"); - return new SortedIteratorMergerWithLimit(shardSubsets, storagePushDownLimit, GTRecord.getPrimaryKeyComparator()).getIterator(); - } else { - logger.info("Using Iterators.concat to merge partition results"); - return Iterators.concat(shardSubsets); + List partitionResults = Lists.newArrayList(); + while (blocks.hasNext()) { + partitionResults.add(new PartitionResultIterator(blocks.next(), info, columns)); } - } - - class EndpointResponseGTScatterFunc implements Function> { - @Nullable - @Override - public Iterator apply(@Nullable final byte[] input) { - - return new Iterator() { - private ByteBuffer inputBuffer = null; - //rotate between two buffer GTRecord to support SortedIteratorMergerWithLimit, which will peek one more GTRecord - private GTRecord firstRecord = null; - - @Override - public boolean hasNext() { - if (inputBuffer == null) { - inputBuffer = ByteBuffer.wrap(input); - firstRecord = new GTRecord(info); - } - return inputBuffer.position() < inputBuffer.limit(); - } - - @Override - public GTRecord next() { - firstRecord.loadColumns(columns, inputBuffer); - return firstRecord; - } + if (partitionResults.size() == 1) { + return partitionResults.get(0); + } - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; + if (!needSorted) { + logger.debug("Using Iterators.concat to merge partition results"); + return Iterators.concat(partitionResults.iterator()); } - } + logger.debug("Using SortMergedPartitionResultIterator to merge partition results"); + return new SortMergedPartitionResultIterator(partitionResults, info, GTRecord.getComparator(groupByDims)); + } } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 82b67b6..e822ada 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -47,6 +47,7 @@ import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.storage.StorageContext; import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer; import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter; import org.apache.kylin.storage.hbase.HBaseConnection; @@ -69,8 +70,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { private static ExecutorService executorService = new LoggableCachedThreadPool(); - public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) { - super(segment, cuboid, fullGTInfo); + public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) { + super(segment, cuboid, fullGTInfo, context); } private byte[] getByteArrayForShort(short v) { @@ -245,7 +246,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { }); } - return new StorageResponseGTScatter(fullGTInfo, new DummyPartitionStreamer(epResultItr), scanRequest.getColumns(), scanRequest.getStoragePushDownLimit()); + return new StorageResponseGTScatter(scanRequest, new DummyPartitionStreamer(epResultItr), storageContext); } private ByteString serializeGTScanReq(GTScanRequest scanRequest) { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index 88e7176..db81646 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -49,6 +49,7 @@ import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRange; import org.apache.kylin.gridtable.IGTStorage; import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.storage.StorageContext; import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,17 +65,19 @@ public abstract class CubeHBaseRPC implements IGTStorage { final protected Cuboid cuboid; final protected GTInfo fullGTInfo; final protected QueryContext queryContext; + final protected StorageContext storageContext; final private RowKeyEncoder fuzzyKeyEncoder; final private RowKeyEncoder fuzzyMaskEncoder; - public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) { + public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) { Preconditions.checkArgument(segment instanceof CubeSegment, "segment must be CubeSegment"); this.cubeSeg = (CubeSegment) segment; this.cuboid = cuboid; this.fullGTInfo = fullGTInfo; this.queryContext = QueryContext.current(); + this.storageContext = context; this.fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid); this.fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java index 33f8d90..951e2ef 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java @@ -42,6 +42,7 @@ import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.gridtable.IGTStore; import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.storage.StorageContext; import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,8 +88,8 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { } } - public CubeHBaseScanRPC(ISegment segment, Cuboid cuboid, final GTInfo fullGTInfo) { - super(segment, cuboid, fullGTInfo); + public CubeHBaseScanRPC(ISegment segment, Cuboid cuboid, final GTInfo fullGTInfo, StorageContext context) { + super(segment, cuboid, fullGTInfo, context); } @Override -- 2.9.3 (Apple Git-75)