From 7ce5fba676107d91b152bce01371cce86f24e8e6 Mon Sep 17 00:00:00 2001 From: wangxiaoyu Date: Wed, 30 Dec 2015 22:44:33 +0800 Subject: [PATCH] KYLIN-1122 Kylin support detail data query from fact table --- .../apache/kylin/measure/MeasureTypeFactory.java | 2 + .../kylin/measure/basic/BasicMeasureType.java | 4 + .../apache/kylin/measure/raw/RawAggregator.java | 65 ++++++ .../apache/kylin/measure/raw/RawMeasureType.java | 230 +++++++++++++++++++++ .../apache/kylin/measure/raw/RawSerializer.java | 92 +++++++++ .../apache/kylin/metadata/model/FunctionDesc.java | 1 + .../org/apache/kylin/metadata/tuple/Tuple.java | 9 +- .../kylin/query/enumerator/OLAPEnumerator.java | 50 ++--- webapp/app/js/controllers/cubeMeasures.js | 3 + webapp/app/js/model/cubeConfig.js | 2 +- 10 files changed, 427 insertions(+), 31 deletions(-) create mode 100644 core-metadata/src/main/java/org/apache/kylin/measure/raw/RawAggregator.java create mode 100644 core-metadata/src/main/java/org/apache/kylin/measure/raw/RawMeasureType.java create mode 100644 core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java index 5c2e6ed..067ab5c 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.kylin.measure.basic.BasicMeasureType; import org.apache.kylin.measure.hllc.HLLCMeasureType; +import org.apache.kylin.measure.raw.RawMeasureType; import org.apache.kylin.measure.topn.TopNMeasureType; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.datatype.DataTypeSerializer; @@ -92,6 +93,7 @@ abstract public class MeasureTypeFactory { // two built-in advanced measure types factoryInsts.add(new HLLCMeasureType.Factory()); factoryInsts.add(new TopNMeasureType.Factory()); + factoryInsts.add(new RawMeasureType.Factory()); /* * Maybe do classpath search for more custom measure types? diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java index b8d201e..9447ab0 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java @@ -77,6 +77,10 @@ public class BasicMeasureType extends MeasureType { if (rtype.isNumberFamily() == false) { throw new IllegalArgumentException("Return type for function " + funcName + " must be one of " + DataType.NUMBER_FAMILY); } + } else if (funcName.equals(FunctionDesc.FUNC_RAW)) { + if (!FunctionDesc.FUNC_RAW.equalsIgnoreCase(rtype.getName())) { + throw new IllegalArgumentException("Return type for function " + funcName + " must be raw "); + } } else { KylinConfig config = KylinConfig.getInstanceFromEnv(); if (config.isQueryIgnoreUnknownFunction() == false) diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawAggregator.java new file mode 100644 index 0000000..fa0b539 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawAggregator.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure.raw; + +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.measure.MeasureAggregator; + +import java.util.ArrayList; +import java.util.List; + +/** + * RAW data Aggregator + */ +public class RawAggregator extends MeasureAggregator> { + + List list = null; + + @Override + public void reset() { + list = null; + } + + @Override + public void aggregate(List value) { + if (list == null) { + list = new ArrayList<>(); + } + if(value != null) { + list.addAll(value); + } + } + + @Override + public List getState() { + return list; + } + + @Override + public int getMemBytesEstimate() { + int bytes = 0; + if(list != null) { + for (ByteArray array : list) { + bytes += array.length() + 1; + } + } + return bytes; + } + +} diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawMeasureType.java new file mode 100644 index 0000000..92044de --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawMeasureType.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure.raw; + +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.measure.MeasureAggregator; +import org.apache.kylin.measure.MeasureIngester; +import org.apache.kylin.measure.MeasureType; +import org.apache.kylin.measure.MeasureTypeFactory; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.datatype.DataTypeSerializer; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.realization.SQLDigest; +import org.apache.kylin.metadata.tuple.Tuple; +import org.apache.kylin.metadata.tuple.TupleInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class RawMeasureType extends MeasureType> { + + private static final Logger logger = LoggerFactory.getLogger(RawMeasureType.class); + + public static final String FUNC_RAW = "RAW"; + public static final String DATATYPE_RAW = "raw"; + + public static class Factory extends MeasureTypeFactory> { + + @Override + public MeasureType> createMeasureType(String funcName, DataType dataType) { + return new RawMeasureType(funcName, dataType); + } + + @Override + public String getAggrFunctionName() { + return FUNC_RAW; + } + + @Override + public String getAggrDataTypeName() { + return DATATYPE_RAW; + } + + @Override + public Class>> getAggrDataTypeSerializer() { + return RawSerializer.class; + } + } + + private final DataType dataType; + + public RawMeasureType(String funcName, DataType dataType) { + this.dataType = dataType; + } + + public void validate(FunctionDesc functionDesc) throws IllegalArgumentException { + validate(functionDesc.getExpression(), functionDesc.getReturnDataType(), true); + } + + private void validate(String funcName, DataType dataType, boolean checkDataType) { + if (FUNC_RAW.equals(funcName) == false) + throw new IllegalArgumentException(); + + if (DATATYPE_RAW.equals(dataType.getName()) == false) + throw new IllegalArgumentException(); + + } + + @Override + public boolean isMemoryHungry() { + return true; + } + + @Override + public MeasureIngester> newIngester() { + return new MeasureIngester>() { + //encode measure value to dictionary + @Override + public List valueOf(String[] values, MeasureDesc measureDesc, Map> dictionaryMap) { + if (values.length != 1) + throw new IllegalArgumentException(); + + String literal = values[0]; + // encode literal using dictionary + TblColRef literalCol = getRawColumn(measureDesc.getFunction()); + Dictionary dictionary = dictionaryMap.get(literalCol); + int keyEncodedValue = dictionary.getIdFromValue(literal); + + ByteArray key = new ByteArray(dictionary.getSizeOfId()); + BytesUtil.writeUnsigned(keyEncodedValue, key.array(), 0, dictionary.getSizeOfId()); + + List valueList = new ArrayList(1); + valueList.add(key); + return valueList; + } + + //merge measure dictionary + @Override + public List reEncodeDictionary(List value, MeasureDesc measureDesc, Map> oldDicts, Map> newDicts) { + TblColRef colRef = getRawColumn(measureDesc.getFunction()); + Dictionary sourceDict = oldDicts.get(colRef); + Dictionary mergedDict = newDicts.get(colRef); + + int valueSize = value.size(); + byte[] newIdBuf = new byte[valueSize * mergedDict.getSizeOfId()]; + byte[] literal = new byte[sourceDict.getSizeOfValue()]; + + int bufOffset = 0; + for (ByteArray c : value) { + int oldId = BytesUtil.readUnsigned(c.array(), c.offset(), c.length()); + int newId; + int size = sourceDict.getValueBytesFromId(oldId, literal, 0); + if (size < 0) { + newId = mergedDict.nullId(); + } else { + newId = mergedDict.getIdFromValueBytes(literal, 0, size); + } + + BytesUtil.writeUnsigned(newId, newIdBuf, bufOffset, mergedDict.getSizeOfId()); + c.set(newIdBuf, bufOffset, mergedDict.getSizeOfId()); + bufOffset += mergedDict.getSizeOfId(); + } + return value; + } + }; + } + + @Override + public MeasureAggregator> newAggregator() { + return new RawAggregator(); + } + + @Override + public List getColumnsNeedDictionary(FunctionDesc functionDesc) { + TblColRef literalCol = functionDesc.getParameter().getColRefs().get(0); + return Collections.singletonList(literalCol); + } + + @Override + public boolean needRewrite() { + return false; + } + + @Override + public Class getRewriteCalciteAggrFunctionClass() { + return null; + } + + @Override + public void adjustSqlDigest(MeasureDesc measureDesc, SQLDigest sqlDigest) { + + } + + @Override + public boolean needAdvancedTupleFilling() { + return true; + } + + @Override + public void fillTupleSimply(Tuple tuple, int indexInTuple, Object measureValue) { + throw new UnsupportedOperationException(); + } + + @Override + public IAdvMeasureFiller getAdvancedTupleFiller(FunctionDesc function, TupleInfo tupleInfo, Map> dictionaryMap) { + final TblColRef literalCol = getRawColumn(function); + final Dictionary rawColDict = dictionaryMap.get(literalCol); + final int literalTupleIdx = tupleInfo.hasColumn(literalCol) ? tupleInfo.getColumnIndex(literalCol) : -1; + + return new IAdvMeasureFiller() { + private List rawList; + private Iterator rawIterator; + private int expectRow = 0; + + @SuppressWarnings("unchecked") + @Override + public void reload(Object measureValue) { + this.rawList = (List) measureValue; + this.rawIterator = rawList.iterator(); + this.expectRow = 0; + } + + @Override + public int getNumOfRows() { + return rawList.size(); + } + + @Override + public void fillTuplle(Tuple tuple, int row) { + if (expectRow++ != row) + throw new IllegalStateException(); + + ByteArray raw = rawIterator.next(); + int key = BytesUtil.readUnsigned(raw.array(), 0, raw.array().length); + String colValue = rawColDict.getValueFromId(key); + tuple.setMeasureValue(literalTupleIdx, colValue); + } + }; + } + + private TblColRef getRawColumn(FunctionDesc functionDesc) { + return functionDesc.getParameter().getColRefs().get(0); + } + + @Override + public boolean onlyAggrInBaseCuboid() { + return true; + } +} diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java new file mode 100644 index 0000000..884f777 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure.raw; + +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.datatype.DataTypeSerializer; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +@SuppressWarnings("unused") +public class RawSerializer extends DataTypeSerializer> { + + + public RawSerializer(DataType dataType) { + } + + @Override + public int peekLength(ByteBuffer in) { + int mark = in.position(); + int len = 0; + if(in.hasRemaining()) { + int size = in.getInt(); + int bytes = in.getInt(); + len = in.position() - mark + bytes; + } + in.position(mark); + return len; + } + + @Override + public int maxLength() { + return 1024 * 1024; + } + + @Override + public int getStorageBytesEstimate() { + return 1024 * 1024; + } + + @Override + public void serialize(List value, ByteBuffer out) { + if(value != null) { + int bytes = 0; + for (ByteArray array : value) { + bytes += (array.length() + 1); + } + if (bytes > out.remaining()) { + throw new RuntimeException("BufferOverflow! Please use one higher cardinality column for dimension column when build RAW cube!"); + } + out.putInt(value.size()); + out.putInt(bytes); + for (ByteArray array : value) { + BytesUtil.writeByteArray(array.array(), out); + } + } + } + + @Override + public List deserialize(ByteBuffer in) { + List value = null; + if(in.hasRemaining()) { + int size = in.getInt(); + int bytes = in.getInt(); + value = new ArrayList(size); + for (int i = 0; i < size; i++) { + value.add(new ByteArray(BytesUtil.readByteArray(in))); + } + } + return value; + } + +} diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java index 36c8722..624a73c 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java @@ -40,6 +40,7 @@ public class FunctionDesc { public static final String FUNC_MIN = "MIN"; public static final String FUNC_MAX = "MAX"; public static final String FUNC_COUNT = "COUNT"; + public static final String FUNC_RAW = "RAW"; public static final String PARAMETER_TYPE_CONSTANT = "constant"; public static final String PARAMETER_TYPE_COLUMN = "column"; diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java index d38aafd..9e888ea 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java @@ -111,10 +111,17 @@ public class Tuple implements ITuple { // BigDecimal during cube build for best precision if ("double".equals(dataType) && fieldValue instanceof BigDecimal) { fieldValue = ((BigDecimal) fieldValue).doubleValue(); - } else if ("integer".equals(dataType) && !(fieldValue instanceof Integer)) { + } else if ("integer".equals(dataType) && fieldValue instanceof Number) { fieldValue = ((Number) fieldValue).intValue(); } else if ("float".equals(dataType) && fieldValue instanceof BigDecimal) { fieldValue = ((BigDecimal) fieldValue).floatValue(); + } else if ("date".equals(dataType) && fieldValue instanceof Long) { + long millis = ((Long)fieldValue).longValue(); + fieldValue = (int) (millis / (1000 * 3600 * 24)); + } else if ("smallint".equals(dataType) && fieldValue instanceof Long) { + fieldValue = ((Long)fieldValue).shortValue(); + } else if ((!"varchar".equals(dataType) || !"char".equals(dataType)) && fieldValue instanceof String) { + fieldValue = convertOptiqCellValue((String)fieldValue, getDataTypeName(idx)); } values[idx] = fieldValue; } diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java index aa8b2a7..4ad3ab8 100644 --- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java +++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java @@ -178,48 +178,40 @@ public class OLAPEnumerator implements Enumerator { return; // If no group by and metric found, then it's simple query like select ... from ... where ..., - // But we have no raw data stored, in order to return better results, we hack to output sum of metric column - logger.info("No group by and aggregation found in this query, will hack some result for better look of output..."); + logger.info("No group by and aggregation found in this query, will trigger raw query..."); // If it's select * from ..., // We need to retrieve cube to manually add columns into sqlDigest, so that we have full-columns results as output. IRealization cube = olapContext.realization; boolean isSelectAll = sqlDigest.allColumns.isEmpty() || sqlDigest.allColumns.equals(sqlDigest.filterColumns); for (TblColRef col : cube.getAllColumns()) { - if (col.getTable().equals(sqlDigest.factTable) && (cube.getAllDimensions().contains(col) || isSelectAll)) { + if (col.getTable().equals(sqlDigest.factTable) && isSelectAll) { sqlDigest.allColumns.add(col); } } for (TblColRef col : sqlDigest.allColumns) { - // For dimension columns, take them as group by columns. - if (cube.getAllDimensions().contains(col)) { - sqlDigest.groupbyColumns.add(col); - } - // For measure columns, take them as metric columns with aggregation function SUM(). - else { - ParameterDesc colParameter = new ParameterDesc(); - colParameter.setType("column"); - colParameter.setValue(col.getName()); - FunctionDesc sumFunc = new FunctionDesc(); - sumFunc.setExpression("SUM"); - sumFunc.setParameter(colParameter); - - boolean measureHasSum = false; - for (MeasureDesc colMeasureDesc : cube.getMeasures()) { - if (colMeasureDesc.getFunction().equals(sumFunc)) { - measureHasSum = true; - break; - } - } - if (measureHasSum) { - sqlDigest.aggregations.add(sumFunc); - } else { - logger.warn("SUM is not defined for measure column " + col + ", output will be meaningless."); + //For columns which has defined raw measure function, take them as metric columns with aggregation function RAW(). + ParameterDesc colParameter = new ParameterDesc(); + colParameter.setType("column"); + colParameter.setValue(col.getName()); + FunctionDesc rawFunc = new FunctionDesc(); + rawFunc.setExpression("RAW"); + rawFunc.setParameter(colParameter); + + boolean measureHasRaw = false; + for (MeasureDesc colMeasureDesc : cube.getMeasures()) { + if (colMeasureDesc.getFunction().equals(rawFunc)) { + measureHasRaw = true; + break; } - - sqlDigest.metricColumns.add(col); } + if (measureHasRaw) { + sqlDigest.aggregations.add(rawFunc); + } else { + logger.warn("RAW is not defined for measure column " + col + ", output will be meaningless."); + } + sqlDigest.metricColumns.add(col); } } diff --git a/webapp/app/js/controllers/cubeMeasures.js b/webapp/app/js/controllers/cubeMeasures.js index 1560d23..43bb67f 100644 --- a/webapp/app/js/controllers/cubeMeasures.js +++ b/webapp/app/js/controllers/cubeMeasures.js @@ -87,6 +87,9 @@ KylinApp.controller('CubeMeasuresCtrl', function ($scope, $modal,MetaModel,cubes case "MAX": $scope.newMeasure.function.returntype = colType; break; + case "RAW": + $scope.newMeasure.function.returntype = "raw"; + break; case "COUNT": $scope.newMeasure.function.returntype = "bigint"; break; diff --git a/webapp/app/js/model/cubeConfig.js b/webapp/app/js/model/cubeConfig.js index a647b53..4250c15 100644 --- a/webapp/app/js/model/cubeConfig.js +++ b/webapp/app/js/model/cubeConfig.js @@ -20,7 +20,7 @@ KylinApp.constant('cubeConfig', { //~ Define metadata & class measureParamType: ['column', 'constant'], - measureExpressions: ['SUM', 'MIN', 'MAX', 'COUNT', 'COUNT_DISTINCT'], + measureExpressions: ['SUM', 'MIN', 'MAX', 'COUNT', 'COUNT_DISTINCT', 'RAW'], dimensionDataTypes: ["string", "tinyint", "int", "bigint", "date"], cubeCapacities: ["SMALL", "MEDIUM", "LARGE"], // cubePartitionTypes : ['APPEND', 'UPDATE_INSERT'], -- 2.1.0