From b07e69e6f8395582c251b609bd63d55a0c6bf118 Mon Sep 17 00:00:00 2001 From: lidongsjtu Date: Mon, 26 Oct 2015 17:17:05 +0800 Subject: [PATCH] KYLIN-1016 Count distinct on any dimension should work even not a predefined measure --- .../apache/kylin/cube/CubeCapabilityChecker.java | 24 ++++--- .../org/apache/kylin/cube/kv/RowValueDecoder.java | 15 +++- .../kylin/query/enumerator/CubeEnumerator.java | 4 +- .../org/apache/kylin/query/schema/OLAPTable.java | 37 ++++++++-- .../query/sqlfunc/HLLDistinctCountAggFunc.java | 84 ++++++++++++++++++++++ query/src/test/resources/query/sql/query82.sql | 21 ++++++ query/src/test/resources/query/sql/query83.sql | 24 +++++++ .../storage/hbase/CubeSegmentTupleIterator.java | 66 ++++++++++++----- .../kylin/storage/hbase/CubeStorageEngine.java | 60 +++++++++++----- 9 files changed, 282 insertions(+), 53 deletions(-) create mode 100644 query/src/test/resources/query/sql/query82.sql create mode 100644 query/src/test/resources/query/sql/query83.sql diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java index e713774..796e95d 100644 --- a/cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java +++ b/cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java @@ -21,6 +21,7 @@ package org.apache.kylin.cube; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Set; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.DimensionDesc; @@ -79,7 +80,15 @@ public class CubeCapabilityChecker { private static boolean isMatchedWithAggregations(Collection aggregations, CubeInstance cube) { CubeDesc cubeDesc = cube.getDescriptor(); - boolean matchAgg = cubeDesc.listAllFunctions().containsAll(aggregations); + boolean matchAgg = true; + List cubeFunctions = cubeDesc.listAllFunctions(); + Set cubeDimensions = cubeDesc.listDimensionColumnsIncludingDerived(); + for (FunctionDesc aggFuncDesc : aggregations) { + matchAgg = cubeFunctions.contains(aggFuncDesc) || (aggFuncDesc.isCountDistinct() && cubeDimensions.containsAll(aggFuncDesc.getParameter().getColRefs())); + if (!matchAgg) { + break; + } + } return matchAgg; } @@ -121,7 +130,7 @@ public class CubeCapabilityChecker { private static boolean isWeaklyMatchedWithAggregations(Collection aggregations, Collection metricColumns, CubeInstance cube) { CubeDesc cubeDesc = cube.getDescriptor(); Collection cubeFuncs = cubeDesc.listAllFunctions(); - + Set cubeDimensions = cubeDesc.listDimensionColumnsIncludingDerived(); boolean matched = true; for (FunctionDesc functionDesc : aggregations) { if (cubeFuncs.contains(functionDesc)) @@ -131,16 +140,13 @@ public class CubeCapabilityChecker { if (functionDesc.isCount()) continue; - if (functionDesc.isCountDistinct()) // calcite can not handle distinct count + List parameterColRefs = functionDesc.getParameter().getColRefs(); + if (functionDesc.isCountDistinct() && !cubeDimensions.containsAll(parameterColRefs)) // calcite can not handle distinct count matched = false; - TblColRef col = null; - if (functionDesc.getParameter().getColRefs().size() > 0) - col = functionDesc.getParameter().getColRefs().get(0); - - if (col == null || !cubeDesc.listDimensionColumnsIncludingDerived().contains(col)) { + TblColRef col = parameterColRefs.size() > 0 ? parameterColRefs.get(0) : null; + if (col == null || !cubeDimensions.contains(col)) matched = false; - } } return matched; } diff --git a/cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java b/cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java index f90a88d..b04abae 100644 --- a/cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java +++ b/cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java @@ -71,6 +71,18 @@ public class RowValueDecoder implements Cloneable { this.values = new Object[measures.length]; } + public RowValueDecoder(MeasureDesc[] measureDescs) { + this.hbaseColumn = null; + this.projectionIndex = new BitSet(); + this.names = new ArrayList(); + this.measures = measureDescs; + for (MeasureDesc measure : measures) { + this.names.add(measure.getFunction().getRewriteFieldName()); + } + this.codec = new MeasureCodec(measures); + this.values = new Object[measures.length]; + } + public void decode(byte[] bytes) { codec.decode(ByteBuffer.wrap(bytes), values); convertToJavaObjects(values, values); @@ -120,7 +132,8 @@ public class RowValueDecoder implements Cloneable { public boolean hasMemHungryCountDistinct() { for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) { FunctionDesc func = measures[i].getFunction(); - if (func.isCountDistinct() && !func.isHolisticCountDistinct()) { + // Set-based distinct count walks same way with HyperLogLog + if (func.isCountDistinct() && (hbaseColumn == null || !func.isHolisticCountDistinct())) { return true; } } diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/CubeEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/CubeEnumerator.java index 66a4035..537501a 100644 --- a/query/src/main/java/org/apache/kylin/query/enumerator/CubeEnumerator.java +++ b/query/src/main/java/org/apache/kylin/query/enumerator/CubeEnumerator.java @@ -214,10 +214,10 @@ public class CubeEnumerator implements Enumerator { // For measure columns, take them as metric columns with aggregation function SUM(). else { ParameterDesc colParameter = new ParameterDesc(); - colParameter.setType("column"); + colParameter.setType(FunctionDesc.PARAMETER_TYPE_COLUMN); colParameter.setValue(col.getName()); FunctionDesc sumFunc = new FunctionDesc(); - sumFunc.setExpression("SUM"); + sumFunc.setExpression(FunctionDesc.FUNC_SUM); sumFunc.setParameter(colParameter); boolean measureHasSum = false; diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java index bbf1024..f051e58 100644 --- a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java +++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java @@ -43,10 +43,7 @@ import org.apache.calcite.schema.impl.AbstractTableQueryable; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.type.SqlTypeUtil; import org.apache.calcite.util.ImmutableBitSet; -import org.apache.kylin.metadata.model.ColumnDesc; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.*; import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.query.enumerator.OLAPQuery; import org.apache.kylin.query.enumerator.OLAPQuery.EnumeratorTypeEnum; @@ -166,6 +163,7 @@ public class OLAPTable extends AbstractQueryableTable implements TranslatableTab List tableColumns = Lists.newArrayList(mgr.listExposedColumns(olapSchema.getProjectName(), sourceTable.getIdentity())); List countMeasures = mgr.listEffectiveMeasures(olapSchema.getProjectName(), sourceTable.getIdentity(), true); + HashSet measureCols = Sets.newHashSet(); HashSet metFields = new HashSet(); List metricColumns = Lists.newArrayList(); for (MeasureDesc m : countMeasures) { @@ -181,6 +179,37 @@ public class OLAPTable extends AbstractQueryableTable implements TranslatableTab fakeCountCol.init(sourceTable); metricColumns.add(fakeCountCol); } + + if (func.getParameter().getColRefs() != null) { + for (TblColRef paramCol : func.getParameter().getColRefs()) { + measureCols.add(paramCol.getColumn()); + } + } + } + + // creates DistinctCount measure column for each dimension column, as placeholder for count(distinct DIM) aggregation + for (ColumnDesc tableCol : tableColumns) { + if (measureCols.contains(tableCol)) { + continue; + } + + ParameterDesc colParameter = new ParameterDesc(); + colParameter.setType(FunctionDesc.PARAMETER_TYPE_COLUMN); + colParameter.setValue(tableCol.getName()); + FunctionDesc distinctCntFunc = new FunctionDesc(); + distinctCntFunc.setExpression(FunctionDesc.FUNC_COUNT_DISTINCT); + distinctCntFunc.setParameter(colParameter); + distinctCntFunc.setReturnType("bigint"); + + String fieldName = distinctCntFunc.getRewriteFieldName(); + if (!metFields.contains(fieldName)) { + metFields.add(fieldName); + ColumnDesc fakeCountCol = new ColumnDesc(); + fakeCountCol.setName(fieldName); + fakeCountCol.setDatatype(distinctCntFunc.getSQLType().toString()); + fakeCountCol.init(sourceTable); + metricColumns.add(fakeCountCol); + } } //if exist sum(x), where x is integer/short/byte diff --git a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java b/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java index 9716ff6..417fc04 100644 --- a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java +++ b/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java @@ -20,6 +20,7 @@ package org.apache.kylin.query.sqlfunc; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Set; import org.apache.kylin.common.hll.HyperLogLogPlusCounter; import org.slf4j.Logger; @@ -40,6 +41,9 @@ public class HLLDistinctCountAggFunc { if (v instanceof Long) { // holistic case long l = (Long) v; return new FixedValueHLLCMockup(l); + } else if (v instanceof Set) { // dimension distinct count case + Set s = (Set) v; + return new DimensionDistinctCountHLLMockup(s); } else { HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v; return new HyperLogLogPlusCounter(c); @@ -58,6 +62,17 @@ public class HLLDistinctCountAggFunc { ((FixedValueHLLCMockup) counter).set(l); return counter; } + } else if (v instanceof Set) { // dimension distinct count case + Set s = (Set) v; + if (counter == null) { + return new DimensionDistinctCountHLLMockup(s); + } else { + if (!(counter instanceof DimensionDistinctCountHLLMockup)) + throw new IllegalStateException("counter is not DimensionDistinctCountHLLMockup"); + + ((DimensionDistinctCountHLLMockup) counter).set(s); + return counter; + } } else { HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v; if (counter == null) { @@ -149,4 +164,73 @@ public class HLLDistinctCountAggFunc { } } + private static class DimensionDistinctCountHLLMockup extends HyperLogLogPlusCounter { + + private Set set = null; + + DimensionDistinctCountHLLMockup(Set set) { + this.set = set; + } + + public void set(Set set) { + if (this.set == null) { + this.set = set; + } else { + this.set.addAll(set); + } + } + + @Override + public void clear() { + this.set = null; + } + + @Override + protected void add(long hash) { + throw new UnsupportedOperationException(); + } + + @Override + public void merge(HyperLogLogPlusCounter another) { + throw new UnsupportedOperationException(); + } + + @Override + public long getCountEstimate() { + return (long)set.size(); + } + + @Override + public void writeRegisters(ByteBuffer out) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void readRegisters(ByteBuffer in) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + int setHashCode = set.hashCode(); + result = prime * result + setHashCode ^ (setHashCode >>> 32); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (getClass() != obj.getClass()) + return false; + DimensionDistinctCountHLLMockup other = (DimensionDistinctCountHLLMockup) obj; + if (!set.equals(other.set)) + return false; + return true; + } + } } diff --git a/query/src/test/resources/query/sql/query82.sql b/query/src/test/resources/query/sql/query82.sql new file mode 100644 index 0000000..a5350c4 --- /dev/null +++ b/query/src/test/resources/query/sql/query82.sql @@ -0,0 +1,21 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +select seller_id, lstg_site_id, count(DISTINCT leaf_categ_id) as CategCount +from test_kylin_fact +group by seller_id, lstg_site_id \ No newline at end of file diff --git a/query/src/test/resources/query/sql/query83.sql b/query/src/test/resources/query/sql/query83.sql new file mode 100644 index 0000000..c1b56c4 --- /dev/null +++ b/query/src/test/resources/query/sql/query83.sql @@ -0,0 +1,24 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +select test_kylin_fact.lstg_format_name, sum(test_kylin_fact.price) as GMV, count(*) as TRANS_CNT, count(DISTINCT test_cal_dt.week_beg_dt) as DT_CNT + from test_kylin_fact + inner JOIN edw.test_cal_dt as test_cal_dt + ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt + where test_cal_dt.week_beg_dt between DATE '2013-05-01' and DATE '2013-08-01' + group by test_kylin_fact.lstg_format_name, test_cal_dt.week_beg_dt \ No newline at end of file diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java index 9efbb79..9c679eb 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java @@ -20,14 +20,12 @@ package org.apache.kylin.storage.hbase; import java.io.IOException; import java.text.MessageFormat; -import java.util.BitSet; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import com.google.common.collect.Sets; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Result; @@ -48,6 +46,7 @@ import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.kv.RowKeyDecoder; import org.apache.kylin.cube.kv.RowValueDecoder; +import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; import org.apache.kylin.cube.model.HBaseColumnDesc; import org.apache.kylin.metadata.filter.TupleFilter; @@ -210,11 +209,18 @@ public class CubeSegmentTupleIterator implements ITupleIterator { Iterator iter = null; try { + Collection validRowValueDecoders = Collections2.filter(rowValueDecoders, new Predicate() { + @Override + public boolean apply(RowValueDecoder input) { + return input != null && input.getHBaseColumn() != null; + } + }); + scan = buildScan(keyRange); applyFuzzyFilter(scan, keyRange); logScan(keyRange); - scanner = ObserverEnabler.scanWithCoprocessorIfBeneficial(cubeSeg, keyRange.getCuboid(), filter, groupBy, rowValueDecoders, context, table, scan); + scanner = ObserverEnabler.scanWithCoprocessorIfBeneficial(cubeSeg, keyRange.getCuboid(), filter, groupBy, validRowValueDecoders, context, table, scan); iter = scanner.iterator(); } catch (Throwable t) { @@ -257,9 +263,11 @@ public class CubeSegmentTupleIterator implements ITupleIterator { scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE)); for (RowValueDecoder valueDecoder : this.rowValueDecoders) { HBaseColumnDesc hbaseColumn = valueDecoder.getHBaseColumn(); - byte[] byteFamily = Bytes.toBytes(hbaseColumn.getColumnFamilyName()); - byte[] byteQualifier = Bytes.toBytes(hbaseColumn.getQualifier()); - scan.addColumn(byteFamily, byteQualifier); + if (hbaseColumn != null) { + byte[] byteFamily = Bytes.toBytes(hbaseColumn.getColumnFamilyName()); + byte[] byteQualifier = Bytes.toBytes(hbaseColumn.getQualifier()); + scan.addColumn(byteFamily, byteQualifier); + } } scan.setStartRow(keyRange.getStartKey()); scan.setStopRow(keyRange.getStopKey()); @@ -347,11 +355,13 @@ public class CubeSegmentTupleIterator implements ITupleIterator { } private void translateResult(Result res, Tuple tuple) throws IOException { + Map aliasMap = context.getAliasMap(); + // groups byte[] rowkey = res.getRow(); rowKeyDecoder.decode(rowkey); List columns = rowKeyDecoder.getColumns(); - List dimensionNames = rowKeyDecoder.getNames(context.getAliasMap()); + List dimensionNames = rowKeyDecoder.getNames(aliasMap); List dimensionValues = rowKeyDecoder.getValues(); for (int i = 0; i < dimensionNames.size(); i++) { TblColRef column = columns.get(i); @@ -369,16 +379,34 @@ public class CubeSegmentTupleIterator implements ITupleIterator { // aggregations for (RowValueDecoder rowValueDecoder : this.rowValueDecoders) { HBaseColumnDesc hbaseColumn = rowValueDecoder.getHBaseColumn(); - String columnFamily = hbaseColumn.getColumnFamilyName(); - String qualifier = hbaseColumn.getQualifier(); - // FIXME: avoidable bytes array creation, why not use res.getValueAsByteBuffer directly? - byte[] valueBytes = res.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); - rowValueDecoder.decode(valueBytes); List measureNames = rowValueDecoder.getNames(); - Object[] measureValues = rowValueDecoder.getValues(); BitSet projectionIndex = rowValueDecoder.getProjectionIndex(); - for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) { - tuple.setMeasureValue(measureNames.get(i), measureValues[i]); + + if (hbaseColumn != null) { + String columnFamily = hbaseColumn.getColumnFamilyName(); + String qualifier = hbaseColumn.getQualifier(); + // FIXME: avoidable bytes array creation, why not use res.getValueAsByteBuffer directly? + byte[] valueBytes = res.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); + rowValueDecoder.decode(valueBytes); + Object[] measureValues = rowValueDecoder.getValues(); + for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) { + tuple.setMeasureValue(measureNames.get(i), measureValues[i]); + } + } else { + // if hbaseColumn == null, then the aggregation is on dimension, Set is used to stores the values retrieved from row key. + MeasureDesc[] measureDescs = rowValueDecoder.getMeasures(); + CubeDesc cubeDesc = cube.getDescriptor(); + for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) { + List measureValues = Lists.newArrayList(); + Set measureValueSet = Sets.newHashSet(); + + // Here uses List to store values of dimensions in measure, and uses Set as Counter of Distinct Count calculation + for (TblColRef measureTblCol : measureDescs[i].getFunction().getParameter().getColRefs()) { + measureValues.add(tuple.getValue(measureTblCol.getName())); + } + measureValueSet.add(measureValues); + tuple.setMeasureValue(measureNames.get(i), measureValueSet); + } } } } diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java index 8eb7bcb..bdc8b23 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java @@ -152,9 +152,10 @@ public class CubeStorageEngine implements IStorageEngine { } } + Set cubeDimensions = cubeDesc.listDimensionColumnsIncludingDerived(); for (TblColRef column : sqlDigest.allColumns) { // skip measure columns - if (sqlDigest.metricColumns.contains(column)) { + if (sqlDigest.metricColumns.contains(column) && !cubeDimensions.contains(column)) { continue; } dimensions.add(column); @@ -342,31 +343,54 @@ public class CubeStorageEngine implements IStorageEngine { private List translateAggregation(HBaseMappingDesc hbaseMapping, Collection metrics, // StorageContext context) { Map codecMap = Maps.newHashMap(); + List dimensionCountDistinctList = Lists.newArrayList(); + Set cubeDimensions = cubeDesc.listDimensionColumnsIncludingDerived(); + for (FunctionDesc aggrFunc : metrics) { Collection hbCols = hbaseMapping.findHBaseColumnByFunction(aggrFunc); if (hbCols.isEmpty()) { - throw new IllegalStateException("can't find HBaseColumnDesc for function " + aggrFunc.getFullExpression()); - } - HBaseColumnDesc bestHBCol = null; - int bestIndex = -1; - for (HBaseColumnDesc hbCol : hbCols) { - bestHBCol = hbCol; - bestIndex = hbCol.findMeasureIndex(aggrFunc); - MeasureDesc measure = hbCol.getMeasures()[bestIndex]; - // criteria for holistic measure: Exact Aggregation && Exact Cuboid - if (measure.isHolisticCountDistinct() && context.isExactAggregation()) { - logger.info("Holistic count distinct chosen for " + aggrFunc); - break; + if (!cubeDimensions.containsAll(aggrFunc.getParameter().getColRefs())) { + throw new IllegalStateException("can't find HBaseColumnDesc for function " + aggrFunc.getFullExpression()); + } + logger.info("Found distinct count aggregation on dimensions: " + aggrFunc + ", do not use HyperLogLog."); + + MeasureDesc fakeMeasureDesc = new MeasureDesc(); + fakeMeasureDesc.setFunction(aggrFunc); + aggrFunc.setReturnType("bigint"); + dimensionCountDistinctList.add(fakeMeasureDesc); + } else { + HBaseColumnDesc bestHBCol = null; + int bestIndex = -1; + for (HBaseColumnDesc hbCol : hbCols) { + bestHBCol = hbCol; + bestIndex = hbCol.findMeasureIndex(aggrFunc); + MeasureDesc measure = hbCol.getMeasures()[bestIndex]; + // criteria for holistic measure: Exact Aggregation && Exact Cuboid + if (measure.isHolisticCountDistinct() && context.isExactAggregation()) { + logger.info("Holistic count distinct chosen for " + aggrFunc); + break; + } + } + + RowValueDecoder codec = codecMap.get(bestHBCol); + if (codec == null) { + codec = new RowValueDecoder(bestHBCol); + codecMap.put(bestHBCol, codec); } + codec.setIndex(bestIndex); } + } - RowValueDecoder codec = codecMap.get(bestHBCol); - if (codec == null) { - codec = new RowValueDecoder(bestHBCol); - codecMap.put(bestHBCol, codec); + if (!dimensionCountDistinctList.isEmpty()) { + MeasureDesc[] dimensionCountDistinctArr = new MeasureDesc[dimensionCountDistinctList.size()]; + dimensionCountDistinctArr = dimensionCountDistinctList.toArray(dimensionCountDistinctArr); + RowValueDecoder codec = new RowValueDecoder(dimensionCountDistinctArr); + for (int i = 0; i < dimensionCountDistinctList.size(); i++) { + codec.setIndex(i); } - codec.setIndex(bestIndex); + codecMap.put(null, codec); } + return new ArrayList(codecMap.values()); } -- 2.3.8 (Apple Git-58)