Server: SimpleHTTP/0.6 Python/2.7.3 Date: Fri, 13 Oct 2017 04:11:02 GMT Content-type: text/x-diff Content-Length: 3437 Last-Modified: Fri, 13 Oct 2017 04:09:53 GMT From 4bf04c663606e389b61742aa76c63ee02b15c948 Mon Sep 17 00:00:00 2001 From: hzfengyu Date: Fri, 13 Oct 2017 12:09:41 +0800 Subject: [PATCH] KYLIN-2926, DumpMerger return incorrect results, create codec for each dump Signed-off-by: hzfengyu --- .../java/org/apache/kylin/gridtable/GTAggregateScanner.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index 0dd6fa9..07bec88 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -57,6 +57,7 @@ public class GTAggregateScanner implements IGTScanner { private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class); final GTInfo info; + final GTScanRequest request; final ImmutableBitSet dimensions; // dimensions to return, can be more than group by final ImmutableBitSet groupBy; final ImmutableBitSet metrics; @@ -81,6 +82,7 @@ public class GTAggregateScanner implements IGTScanner { throw new IllegalStateException(); this.info = inputScanner.getInfo(); + this.request = req; this.dimensions = req.getDimensions(); this.groupBy = req.getAggrGroupBy(); this.metrics = req.getAggrMetrics(); @@ -507,6 +509,7 @@ public class GTAggregateScanner implements IGTScanner { final PriorityQueue> minHeap; final List>> dumpIterators; final List dumpCurrentValues; + final List codecs; final MeasureAggregator[] resultMeasureAggregators = newAggregators(); final MeasureAggregators resultAggrs = new MeasureAggregators(resultMeasureAggregators); @@ -519,11 +522,13 @@ public class GTAggregateScanner implements IGTScanner { }); dumpIterators = Lists.newArrayListWithCapacity(dumps.size()); dumpCurrentValues = Lists.newArrayListWithCapacity(dumps.size()); + codecs = Lists.newArrayListWithCapacity(dumps.size()); Iterator> it; for (int i = 0; i < dumps.size(); i++) { it = dumps.get(i).iterator(); dumpCurrentValues.add(i, null); + codecs.add(i, request.createMeasureCodec()); if (it.hasNext()) { dumpIterators.add(i, it); enqueueFromDump(i); @@ -538,7 +543,8 @@ public class GTAggregateScanner implements IGTScanner { Pair pair = dumpIterators.get(index).next(); minHeap.offer(new Pair(pair.getKey(), index)); Object[] metricValues = new Object[metrics.trueBitCount()]; - measureCodec.decode(ByteBuffer.wrap(pair.getValue()), metricValues); + BufferedMeasureCodec codec= codecs.get(index); + codec.decode(ByteBuffer.wrap(pair.getValue()), metricValues); dumpCurrentValues.set(index, metricValues); } } -- 1.7.10.4