From f37894ee5bad635558f537ecadd0b46c65ea35ad Mon Sep 17 00:00:00 2001 From: terry Date: Thu, 3 Nov 2016 21:20:01 +0800 Subject: [PATCH] KYLIN-2064, add non-runtime-aggregation measure for distinct count Signed-off-by: terry --- .../kylin/measure/BufferedMeasureEncoder.java | 24 +++ .../apache/kylin/measure/MeasureTypeFactory.java | 4 + .../measure/nonaggregation/NonAggrAggregator.java | 61 +++++++ .../nonaggregation/NonAggrBitmapCounter.java | 107 +++++++++++ .../nonaggregation/NonAggrBitmapMeasureType.java | 127 +++++++++++++ .../measure/nonaggregation/NonAggrCounter.java | 190 ++++++++++++++++++++ .../NonAggrDistinctCountAggFunc.java | 51 ++++++ .../nonaggregation/NonAggrHLLCMeasureType.java | 130 ++++++++++++++ .../measure/nonaggregation/NonAggrHLLCounter.java | 104 +++++++++++ .../nonaggregation/NonAggrMeasureSerializer.java | 109 +++++++++++ .../apache/kylin/metadata/datatype/DataType.java | 10 ++ .../metadata/datatype/DataTypeSerializer.java | 4 + .../measure/nonaggregation/NonAggrCounterTest.java | 113 ++++++++++++ .../measure/nonaggregation/NonAggrFlushTest.java | 88 +++++++++ .../nonaggregation/NonAggrSerializerTest.java | 70 ++++++++ .../kylin/storage/hbase/steps/KeyValueCreator.java | 7 +- 16 files changed, 1197 insertions(+), 2 deletions(-) create mode 100644 core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrAggregator.java create mode 100644 core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrBitmapCounter.java create mode 100644 core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrBitmapMeasureType.java create mode 100644 core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrCounter.java create mode 100644 core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrDistinctCountAggFunc.java create mode 100644 core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrHLLCMeasureType.java create mode 100644 core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrHLLCounter.java create mode 100644 core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrMeasureSerializer.java create mode 100644 core-metadata/src/test/java/org/apache/kylin/measure/nonaggregation/NonAggrCounterTest.java create mode 100644 core-metadata/src/test/java/org/apache/kylin/measure/nonaggregation/NonAggrFlushTest.java create mode 100644 core-metadata/src/test/java/org/apache/kylin/measure/nonaggregation/NonAggrSerializerTest.java diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureEncoder.java b/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureEncoder.java index 2b14715..dc4f17d 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureEncoder.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureEncoder.java @@ -104,4 +104,28 @@ public class BufferedMeasureEncoder { } } } + + public ByteBuffer flush(Object[] values) { + if (buf == null) { + setBufferSize(DEFAULT_BUFFER_SIZE); + } + assert values.length == codec.nMeasures; + while (true) { + try { + buf.clear(); + for (int i = 0, pos = 0; i < codec.nMeasures; i++) { + codec.serializers[i].flush(values[i], buf); + measureSizes[i] = buf.position() - pos; + pos = buf.position(); + } + return buf; + + } catch (BufferOverflowException boe) { + if (buf.capacity() >= MAX_BUFFER_SIZE) + throw boe; + + setBufferSize(buf.capacity() * 2); + } + } + } } diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java index 17d841a..33fe8c0 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java @@ -28,6 +28,8 @@ import org.apache.kylin.measure.bitmap.BitmapMeasureType; import org.apache.kylin.measure.dim.DimCountDistinctMeasureType; import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType; import org.apache.kylin.measure.hllc.HLLCMeasureType; +import org.apache.kylin.measure.nonaggregation.NonAggrBitmapMeasureType; +import org.apache.kylin.measure.nonaggregation.NonAggrHLLCMeasureType; import org.apache.kylin.measure.raw.RawMeasureType; import org.apache.kylin.measure.topn.TopNMeasureType; import org.apache.kylin.metadata.datatype.DataType; @@ -106,6 +108,8 @@ abstract public class MeasureTypeFactory { factoryInsts.add(new TopNMeasureType.Factory()); factoryInsts.add(new RawMeasureType.Factory()); factoryInsts.add(new ExtendedColumnMeasureType.Factory()); + factoryInsts.add(new NonAggrHLLCMeasureType.Factory()); + factoryInsts.add(new NonAggrBitmapMeasureType.Factory()); logger.info("Checking custom measure types from kylin config"); diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrAggregator.java new file mode 100644 index 0000000..70820cf --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrAggregator.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.measure.nonaggregation; + +import org.apache.kylin.measure.MeasureAggregator; + +public class NonAggrAggregator extends MeasureAggregator { + private static final long serialVersionUID = 1L; + protected NonAggrCounter sum = null; + /** + * HLLC:hyperloglog measure with non-aggregation. + * BITMAP:bitmap measure with non-aggregation. + */ + public enum NonAggrType { + HLLC, + BITMAP + } + + @Override + public void reset() { + sum = null; + } + @Override + public void aggregate(NonAggrCounter value) { + if (sum == null) { + if(value.getType() == NonAggrType.HLLC) { + sum = new NonAggrHLLCounter(value); + } else if(value.getType() == NonAggrType.BITMAP) { + sum = new NonAggrBitmapCounter(value); + } + } else + sum.merge(value); + } + @Override + public NonAggrCounter getState() { + return sum; + } + @Override + public int getMemBytesEstimate() { + if(sum == null) { + return Integer.MIN_VALUE; + } else { + return sum.maxLength(); + } + } +} \ No newline at end of file diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrBitmapCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrBitmapCounter.java new file mode 100644 index 0000000..35c84d2 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrBitmapCounter.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.measure.nonaggregation; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.kylin.measure.bitmap.BitmapCounter; +import org.apache.kylin.measure.nonaggregation.NonAggrAggregator.NonAggrType; + +@SuppressWarnings("serial") +public class NonAggrBitmapCounter extends NonAggrCounter { + private BitmapCounter counter; + + public NonAggrBitmapCounter() { + finalCount = INVALID_VALUE; + status = NonAggrStatus.INTERMEDIATE; + counter = new BitmapCounter(); + } + + public NonAggrBitmapCounter(NonAggrCounter value) { + this.status = value.status; + if(value.status == NonAggrStatus.FINAL) { + this.finalCount = value.finalCount; + } else if(value.status == NonAggrStatus.INTERMEDIATE){ + NonAggrBitmapCounter v = (NonAggrBitmapCounter) value; + counter = new BitmapCounter(v.counter); + } else { + this.counter = null; + this.finalCount = INVALID_VALUE; + } + } + + @Override + protected void setInvalidCounter() { + counter = null; + } + + @Override + protected void clearCounter() { + counter.clear(); + } + + @Override + protected void addValue(String value) { + counter.add(value); + } + + @Override + protected void addValue(int value) { + counter.add(value); + } + + @Override + protected void mergeCounter(NonAggrCounter another) { + if(another instanceof NonAggrBitmapCounter) { + counter.merge(((NonAggrBitmapCounter)another).counter); + } + } + + @Override + protected long getCounterValue() { + return counter.getCount(); + } + + @Override + protected void write(ByteBuffer out) throws IOException { + counter.writeRegisters(out); + } + + + @Override + protected void read(ByteBuffer in) throws IOException { + counter.readRegisters(in); + } + + @Override + protected int peekDataLength(ByteBuffer in) { + return counter.peekLength(in); + } + + @Override + protected int maxDataLength() { + return counter.getMemBytes(); + } + + @Override + protected NonAggrType getType() { + return NonAggrType.BITMAP; + } + +} \ No newline at end of file diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrBitmapMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrBitmapMeasureType.java new file mode 100644 index 0000000..b57a395 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrBitmapMeasureType.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.measure.nonaggregation; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.measure.MeasureAggregator; +import org.apache.kylin.measure.MeasureIngester; +import org.apache.kylin.measure.MeasureType; +import org.apache.kylin.measure.MeasureTypeFactory; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.datatype.DataTypeSerializer; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.realization.CapabilityResult; +import org.apache.kylin.metadata.realization.SQLDigest; + +public class NonAggrBitmapMeasureType extends MeasureType { + public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT"; + public static final String DATATYPE_NON_AGGR_BITMAP = "nbitmap"; + + public NonAggrBitmapMeasureType(String funcName, DataType dataType) { + } + + public static class Factory extends MeasureTypeFactory { + + @Override + public MeasureType createMeasureType(String funcName, DataType dataType) { + return new NonAggrBitmapMeasureType(funcName, dataType); + } + + @Override + public String getAggrFunctionName() { + return FUNC_COUNT_DISTINCT; + } + + @Override + public String getAggrDataTypeName() { + return DATATYPE_NON_AGGR_BITMAP; + } + + @Override + public Class> getAggrDataTypeSerializer() { + return NonAggrMeasureSerializer.NonAggrBitmapMeasureSerializer.class; + } + } + + public void validate(FunctionDesc functionDesc) throws IllegalArgumentException { + if (FUNC_COUNT_DISTINCT.equals(functionDesc.getExpression()) == false) + throw new IllegalArgumentException("non-aggr BitmapMeasureType func is not " + FUNC_COUNT_DISTINCT + " but " + functionDesc.getExpression()); + + if (DATATYPE_NON_AGGR_BITMAP.equals(functionDesc.getReturnDataType().getName()) == false) + throw new IllegalArgumentException("non-aggr BitmapMeasureType datatype is not " + DATATYPE_NON_AGGR_BITMAP + " but " + functionDesc.getReturnDataType().getName()); + + List colRefs = functionDesc.getParameter().getColRefs(); + if (colRefs.size() != 1) { + throw new IllegalArgumentException("non-aggr BitmapMeasureType col parameters count is not 1 but " + colRefs.size()); + } + } + + @Override + public boolean isMemoryHungry() { + return true; + } + + @Override + public boolean needRewrite() { + return true; + } + + @Override + public Class getRewriteCalciteAggrFunctionClass() { + return NonAggrDistinctCountAggFunc.class; + } + + @Override + public MeasureIngester newIngester() { + return new MeasureIngester() { + NonAggrCounter current = new NonAggrBitmapCounter(); + + @Override + public NonAggrCounter valueOf(String[] values, MeasureDesc measureDesc, Map> dictionaryMap) { + NonAggrCounter hllc = current; + hllc.clear(); + for (String v : values) { + if (v != null) + hllc.add(v); + } + return hllc; + } + }; + } + + @Override + public MeasureAggregator newAggregator() { + return new NonAggrAggregator(); + } + + public CapabilityResult.CapabilityInfluence influenceCapabilityCheck(Collection unmatchedDimensions, Collection unmatchedAggregations, SQLDigest digest, MeasureDesc measureDesc) { + //contain non aggregation measure : cost * 1.1 + return new CapabilityResult.CapabilityInfluence() { + @Override + public double suggestCostMultiplier() { + return 1.1; + } + }; + } +} diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrCounter.java new file mode 100644 index 0000000..6bfa71a --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrCounter.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.measure.nonaggregation; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; + +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.measure.nonaggregation.NonAggrAggregator.NonAggrType; + +@SuppressWarnings("serial") +public abstract class NonAggrCounter implements Serializable { + /** + * status = FINAL means countValue is valid, store final count distinct value. + * status = INTERMEDIATE means counter is valid, store hyperloglog/bitmap infomation. + * status = ERROR means merge operation happens while only store final value. + */ + public enum NonAggrStatus { + FINAL, + INTERMEDIATE, + ERROR + } + + protected long finalCount; + + protected NonAggrStatus status; + protected static long INVALID_VALUE = Long.MIN_VALUE; + + public NonAggrCounter(int precision) { + finalCount = INVALID_VALUE; + status = NonAggrStatus.INTERMEDIATE; + } + + public NonAggrCounter() {} + + protected abstract NonAggrType getType(); + + protected abstract void setInvalidCounter(); + + private void setInvalid() { + this.finalCount = INVALID_VALUE; + setInvalidCounter(); + this.status = NonAggrStatus.ERROR; + } + + protected abstract void clearCounter(); + + public void clear() { + if(status == NonAggrStatus.FINAL) { + this.finalCount = INVALID_VALUE; + } else if(status == NonAggrStatus.INTERMEDIATE) { + clearCounter(); + } else { + setInvalid(); + } + } + + protected abstract void addValue(String value); + protected abstract void addValue(int value); + + public void add(int value) { + if(status == NonAggrStatus.FINAL) { + setInvalid(); + } else if(status == NonAggrStatus.INTERMEDIATE) { + addValue(value); + } + } + + public void add(byte[] value) { + add(value, 0, value.length); + } + + public void add(byte[] value, int offset, int length) { + if (value == null || length == 0) { + return; + } + + add(new String(value, offset, length)); + } + + public void add(String value) { + if(status == NonAggrStatus.FINAL) { + setInvalid(); + } else if(status == NonAggrStatus.INTERMEDIATE) { + addValue(value); + } + } + + protected abstract void mergeCounter(NonAggrCounter another); + + public void merge(NonAggrCounter another) { + if(status == NonAggrStatus.FINAL) { + setInvalid(); + } else if(status == NonAggrStatus.INTERMEDIATE) { + mergeCounter(another); + } + } + + protected abstract long getCounterValue(); + + public long getCountValue() { + if(status == NonAggrStatus.FINAL) { + return this.finalCount; + } else if(status == NonAggrStatus.INTERMEDIATE) { + return getCounterValue(); + } else { + throw new IllegalArgumentException("Can not do aggregation with non-aggregation measure!"); + } + } + + protected abstract void write(final ByteBuffer out) throws IOException; + + public void writeRegisters(final ByteBuffer out) throws IOException { + out.put((byte) status.ordinal()); + if(status == NonAggrStatus.FINAL) { + BytesUtil.writeLong(this.finalCount, out); + } else if(status == NonAggrStatus.INTERMEDIATE) { + write(out); + } + } + + public void setCountValue() { + this.finalCount = getCounterValue(); + clearCounter(); + status = NonAggrStatus.FINAL; + } + + protected abstract void read(ByteBuffer in) throws IOException; + + public void readRegisters(ByteBuffer in) throws IOException { + byte flag = in.get(); + NonAggrStatus status = NonAggrStatus.values()[flag]; + if(status == NonAggrStatus.FINAL) { + this.finalCount = BytesUtil.readLong(in); + } else if(status == NonAggrStatus.INTERMEDIATE) { + read(in); + } else if(status == NonAggrStatus.ERROR) { + setInvalid(); + } else { + throw new IllegalArgumentException("Can not parse non-aggr counter status value " + status); + } + this.status = status; + } + + protected abstract int peekDataLength(ByteBuffer in); + + public int peekLength(ByteBuffer in) { + int mark = in.position(); + int len = 0; + byte flag = in.get(); + NonAggrStatus status = NonAggrStatus.values()[flag]; + if(status == NonAggrStatus.FINAL) { + len = Long.SIZE / 8; + } else if(status == NonAggrStatus.INTERMEDIATE){ + len = peekDataLength(in); + } + len += 1; + in.position(mark); + return len; + } + + protected abstract int maxDataLength(); + + public int maxLength() { + int len = 0; + if(status == NonAggrStatus.FINAL) { + len = Long.SIZE / 8; + } else if(status == NonAggrStatus.INTERMEDIATE) { + len = maxDataLength(); + } + + return len + 1; + } +} \ No newline at end of file diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrDistinctCountAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrDistinctCountAggFunc.java new file mode 100644 index 0000000..21fb500 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrDistinctCountAggFunc.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.measure.nonaggregation; + +import org.apache.kylin.measure.nonaggregation.NonAggrAggregator.NonAggrType; + +public class NonAggrDistinctCountAggFunc { + + public static NonAggrCounter init() { + return null; + } + + public static NonAggrCounter add(NonAggrCounter counter, Object v) { + NonAggrCounter c = (NonAggrCounter) v; + if (counter == null) { + if(c.getType() == NonAggrType.HLLC) { + return new NonAggrHLLCounter((NonAggrHLLCounter) c); + } else if(c.getType() == NonAggrType.BITMAP) { + return new NonAggrBitmapCounter((NonAggrBitmapCounter) c); + } else { + return null; + } + } else { + counter.merge(c); + return counter; + } + } + + public static NonAggrCounter merge(NonAggrCounter counter0, Object counter1) { + return add(counter0, counter1); + } + + public static long result(NonAggrCounter counter) { + return counter.getCountValue(); + } +} \ No newline at end of file diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrHLLCMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrHLLCMeasureType.java new file mode 100644 index 0000000..0fd32cc --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrHLLCMeasureType.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.measure.nonaggregation; + +import java.util.Collection; +import java.util.Map; + +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.measure.MeasureAggregator; +import org.apache.kylin.measure.MeasureIngester; +import org.apache.kylin.measure.MeasureType; +import org.apache.kylin.measure.MeasureTypeFactory; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.datatype.DataTypeSerializer; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.realization.CapabilityResult; +import org.apache.kylin.metadata.realization.SQLDigest; + +public class NonAggrHLLCMeasureType extends MeasureType { + private final DataType dataType; + public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT"; + public static final String DATATYPE_NON_AGGR_HLLC = "nhllc"; + + public NonAggrHLLCMeasureType(String funcName, DataType dataType) { + this.dataType = dataType; + } + + public static class Factory extends MeasureTypeFactory { + + @Override + public MeasureType createMeasureType(String funcName, DataType dataType) { + return new NonAggrHLLCMeasureType(funcName, dataType); + } + + @Override + public String getAggrFunctionName() { + return FUNC_COUNT_DISTINCT; + } + + @Override + public String getAggrDataTypeName() { + return DATATYPE_NON_AGGR_HLLC; + } + + @Override + public Class> getAggrDataTypeSerializer() { + return NonAggrMeasureSerializer.NonAggrHLLCMeasureSerializer.class; + } + } + + public void validate(FunctionDesc functionDesc) throws IllegalArgumentException { + validate(functionDesc.getExpression(), functionDesc.getReturnDataType(), true); + } + + private void validate(String funcName, DataType dataType, boolean checkDataType) { + if (FUNC_COUNT_DISTINCT.equals(funcName) == false) + throw new IllegalArgumentException(); + + if (DATATYPE_NON_AGGR_HLLC.equals(dataType.getName()) == false) + throw new IllegalArgumentException(); + + if (dataType.getPrecision() < 1 || dataType.getPrecision() > 5000) + throw new IllegalArgumentException(); + } + + @Override + public boolean isMemoryHungry() { + return true; + } + + @Override + public boolean needRewrite() { + return true; + } + + @Override + public Class getRewriteCalciteAggrFunctionClass() { + return NonAggrDistinctCountAggFunc.class; + } + + @Override + public MeasureIngester newIngester() { + return new MeasureIngester() { + NonAggrCounter current = new NonAggrHLLCounter(dataType.getPrecision()); + + @Override + public NonAggrCounter valueOf(String[] values, MeasureDesc measureDesc, Map> dictionaryMap) { + NonAggrCounter hllc = current; + hllc.clear(); + for (String v : values) { + if (v != null) + hllc.add(v); + } + return hllc; + } + }; + } + + @Override + public MeasureAggregator newAggregator() { + return new NonAggrAggregator(); + } + + public CapabilityResult.CapabilityInfluence influenceCapabilityCheck(Collection unmatchedDimensions, Collection unmatchedAggregations, SQLDigest digest, MeasureDesc measureDesc) { + //contain non aggregation measure : cost * 1.1 + return new CapabilityResult.CapabilityInfluence() { + @Override + public double suggestCostMultiplier() { + return 1.1; + } + }; + } +} diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrHLLCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrHLLCounter.java new file mode 100644 index 0000000..e9bafc8 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrHLLCounter.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.measure.nonaggregation; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.measure.nonaggregation.NonAggrAggregator.NonAggrType; + +@SuppressWarnings("serial") +public class NonAggrHLLCounter extends NonAggrCounter { + private HyperLogLogPlusCounter counter; + + public NonAggrHLLCounter(int precision) { + super(precision); + counter = new HyperLogLogPlusCounter(precision); + } + + public NonAggrHLLCounter(NonAggrCounter value) { + this.status = value.status; + if(value.status == NonAggrStatus.FINAL) { + this.finalCount = value.finalCount; + } else if(value.status == NonAggrStatus.INTERMEDIATE){ + NonAggrHLLCounter v = (NonAggrHLLCounter) value; + counter = new HyperLogLogPlusCounter(v.counter); + } else { + this.counter = null; + this.finalCount = INVALID_VALUE; + } + } + + @Override + protected void setInvalidCounter() { + this.counter = null; + } + + @Override + protected void clearCounter() { + this.counter.clear(); + } + + @Override + protected void addValue(String value) { + this.counter.add(value); + } + + @Override + protected void addValue(int value) { + counter.add(value); + } + + @Override + protected void mergeCounter(NonAggrCounter another) { + if(another instanceof NonAggrHLLCounter) { + this.counter.merge(((NonAggrHLLCounter) another).counter); + } + } + + @Override + protected long getCounterValue() { + return this.counter.getCountEstimate(); + } + + @Override + protected void write(ByteBuffer out) throws IOException { + this.counter.writeRegisters(out); + } + + @Override + protected void read(ByteBuffer in) throws IOException { + this.counter.readRegisters(in); + } + + @Override + protected int peekDataLength(ByteBuffer in) { + return counter.peekLength(in); + } + + @Override + protected int maxDataLength() { + return counter.maxLength(); + } + + @Override + protected NonAggrType getType() { + return NonAggrType.HLLC; + } +} \ No newline at end of file diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrMeasureSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrMeasureSerializer.java new file mode 100644 index 0000000..1eb8d54 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/nonaggregation/NonAggrMeasureSerializer.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.measure.nonaggregation; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.datatype.DataTypeSerializer; + +public abstract class NonAggrMeasureSerializer extends DataTypeSerializer { + + @Override + public void serialize(NonAggrCounter value, ByteBuffer out) { + try { + value.writeRegisters(out); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + protected abstract NonAggrCounter current(); + + @Override + public NonAggrCounter deserialize(ByteBuffer in) { + NonAggrCounter counter = current(); + try { + counter.readRegisters(in); + } catch (IOException e) { + throw new RuntimeException(e); + } + return counter; + } + + @Override + public int peekLength(ByteBuffer in) { + return current().peekLength(in); + } + + @Override + public int getStorageBytesEstimate() { + return Long.SIZE / 8 + 1; + } + + @Override + public void flush(NonAggrCounter value, ByteBuffer out) { + value.setCountValue(); + this.serialize(value, out); + } + + public static class NonAggrHLLCMeasureSerializer extends NonAggrMeasureSerializer { + private ThreadLocal current = new ThreadLocal(); + private int precision; + + public NonAggrHLLCMeasureSerializer(DataType type) { + this.precision = type.getPrecision(); + } + + protected NonAggrHLLCounter current() { + NonAggrHLLCounter counter = current.get(); + if (counter == null) { + counter = new NonAggrHLLCounter(precision); + current.set(counter); + } + return counter; + } + + @Override + public int maxLength() { + return current().maxLength(); + } + } + + public static class NonAggrBitmapMeasureSerializer extends NonAggrMeasureSerializer { + private ThreadLocal current = new ThreadLocal(); + + public NonAggrBitmapMeasureSerializer(DataType type) { + } + + protected NonAggrBitmapCounter current() { + NonAggrBitmapCounter counter = current.get(); + if (counter == null) { + counter = new NonAggrBitmapCounter(); + current.set(counter); + } + return counter; + } + + @Override + public int maxLength() { + return current().maxLength(); + } + } +} \ No newline at end of file diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java index b726c5f..866e5e0 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java @@ -107,6 +107,12 @@ public class DataType implements Serializable { LEGACY_TYPE_MAP.put("hllc14", "hllc(14)"); LEGACY_TYPE_MAP.put("hllc15", "hllc(15)"); LEGACY_TYPE_MAP.put("hllc16", "hllc(16)"); + + LEGACY_TYPE_MAP.put("nhllc10", "nhllc(10)"); + LEGACY_TYPE_MAP.put("nhllc12", "nhllc(12)"); + LEGACY_TYPE_MAP.put("nhllc14", "nhllc(14)"); + LEGACY_TYPE_MAP.put("nhllc15", "nhllc(15)"); + LEGACY_TYPE_MAP.put("nhllc16", "nhllc(16)"); } private static final ConcurrentMap CACHE = new ConcurrentHashMap(); @@ -260,6 +266,10 @@ public class DataType implements Serializable { return name.equals("decimal"); } + public boolean isNonAggrType() { + return name.equals("nhllc") || name.equals("nbitmap"); + } + public String getName() { return name; } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java index a739377..e2a5559 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java @@ -94,4 +94,8 @@ abstract public class DataTypeSerializer implements BytesSerializer { else return value.toString(); } + + public void flush(T value, ByteBuffer out) { + this.serialize(value, out); + } } diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/nonaggregation/NonAggrCounterTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/nonaggregation/NonAggrCounterTest.java new file mode 100644 index 0000000..1f8c86d --- /dev/null +++ b/core-metadata/src/test/java/org/apache/kylin/measure/nonaggregation/NonAggrCounterTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.measure.nonaggregation; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +import org.apache.kylin.measure.nonaggregation.NonAggrCounter.NonAggrStatus; +import org.junit.Test; + +public class NonAggrCounterTest { + Random rand1 = new Random(1); + + @Test + public void testOneHllcAdd() throws IOException { + NonAggrHLLCounter hllc = new NonAggrHLLCounter(14); + NonAggrHLLCounter one = new NonAggrHLLCounter(14); + for (int i = 0; i < 1000000; i++) { + one.clear(); + one.add(rand1.nextInt()); + hllc.merge(one); + } + assertTrue(hllc.getCountValue() > 1000000 * 0.9); + } + + @Test + public void testOneBitmapAdd() throws IOException { + NonAggrBitmapCounter bitmap = new NonAggrBitmapCounter(); + NonAggrBitmapCounter one = new NonAggrBitmapCounter(); + + Set values = new HashSet(); + for (int i = 0; i < 100000; i++) { + one.clear(); + int nextValue = rand1.nextInt(); + one.add(nextValue); + values.add(nextValue); + bitmap.merge(one); + } + assertTrue(bitmap.getCountValue() == values.size()); + } + + @Test(expected = IllegalArgumentException.class) + public void testHllcInvalidMerge() { + NonAggrHLLCounter hllc = new NonAggrHLLCounter(14); + NonAggrHLLCounter one = new NonAggrHLLCounter(14); + for (int i = 0; i < 1000000; i++) { + one.clear(); + one.add(rand1.nextInt()); + hllc.merge(one); + } + + NonAggrHLLCounter hllc2 = new NonAggrHLLCounter(14); + for (int i = 0; i < 1000000; i++) { + one.clear(); + one.add(rand1.nextInt()); + hllc2.merge(one); + } + + hllc.setCountValue(); + hllc2.setCountValue(); + hllc.merge(hllc2); + assertTrue(hllc.finalCount == NonAggrCounter.INVALID_VALUE); + assertTrue(hllc.status == NonAggrStatus.ERROR); + assertTrue(hllc.getCountValue() == 0); + } + + @Test(expected = IllegalArgumentException.class) + public void testBitmapInvalidMerge() { + NonAggrBitmapCounter bitmap = new NonAggrBitmapCounter(); + NonAggrBitmapCounter one = new NonAggrBitmapCounter(); + + for (int i = 0; i < 100000; i++) { + one.clear(); + int nextValue = rand1.nextInt(); + one.add(nextValue); + bitmap.merge(one); + } + + NonAggrBitmapCounter bitmap2 = new NonAggrBitmapCounter(); + for (int i = 0; i < 100000; i++) { + one.clear(); + int nextValue = rand1.nextInt(); + one.add(nextValue); + bitmap.merge(one); + } + bitmap.setCountValue(); + bitmap.setCountValue(); + bitmap.merge(bitmap2); + + assertTrue(bitmap.finalCount == NonAggrCounter.INVALID_VALUE); + assertTrue(bitmap.status == NonAggrStatus.ERROR); + assertTrue(bitmap.getCountValue() == 0); + } +} diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/nonaggregation/NonAggrFlushTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/nonaggregation/NonAggrFlushTest.java new file mode 100644 index 0000000..be70949 --- /dev/null +++ b/core-metadata/src/test/java/org/apache/kylin/measure/nonaggregation/NonAggrFlushTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.measure.nonaggregation; + +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.Random; + +import org.apache.kylin.measure.nonaggregation.NonAggrCounter.NonAggrStatus; +import org.apache.kylin.metadata.datatype.DataType; +import org.junit.Test; + +public class NonAggrFlushTest { + Random rand1 = new Random(); + + public void testSerde(NonAggrCounter counter, NonAggrCounter one, NonAggrMeasureSerializer serializer) { + for (int i = 0; i < 100000; i++) { + one.clear(); + one.add(rand1.nextInt()); + counter.merge(one); + } + + ByteBuffer buffer = ByteBuffer.allocate(10 * 1024 * 1024); + serializer.serialize(counter, buffer); + System.out.println("serialize length is " + buffer.position()); + + buffer.position(0); + counter = serializer.deserialize(buffer); + for (int i = 0; i < 100000; i++) { + one.clear(); + one.add(rand1.nextInt()); + counter.merge(one); + } + long count = counter.getCountValue(); + + buffer.position(0); + serializer.flush(counter, buffer); + int len = buffer.position(); + System.out.println("flush length is " + len); + buffer.position(0); + one = serializer.deserialize(buffer); + + assertTrue(one.status == NonAggrStatus.FINAL); + System.out.println("new count : " + one.getCountValue()); + assertTrue(count == one.getCountValue()); + + buffer.position(0); + assertTrue(len == serializer.peekLength(buffer)); + } + + @Test + public void testBitmapSerde() { + NonAggrCounter counter = new NonAggrBitmapCounter(); + NonAggrCounter one = new NonAggrBitmapCounter(); + counter.add(1000); + counter.add(111); + NonAggrMeasureSerializer serializer = new NonAggrMeasureSerializer.NonAggrBitmapMeasureSerializer(DataType.getType("nbitmap")); + + testSerde(counter, one, serializer); + } + + @Test + public void testHllcSerde() { + NonAggrCounter counter = new NonAggrHLLCounter(14); + NonAggrCounter one = new NonAggrHLLCounter(14); + counter.add("hello"); + counter.add("world"); + NonAggrMeasureSerializer serializer = new NonAggrMeasureSerializer.NonAggrHLLCMeasureSerializer(DataType.getType("nhllc14")); + + testSerde(counter, one, serializer); + } +} diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/nonaggregation/NonAggrSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/nonaggregation/NonAggrSerializerTest.java new file mode 100644 index 0000000..e54b8b8 --- /dev/null +++ b/core-metadata/src/test/java/org/apache/kylin/measure/nonaggregation/NonAggrSerializerTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.measure.nonaggregation; + +import static org.junit.Assert.*; + +import java.nio.ByteBuffer; + +import org.apache.kylin.measure.nonaggregation.NonAggrCounter.NonAggrStatus; +import org.apache.kylin.metadata.datatype.DataType; +import org.junit.Test; + +public class NonAggrSerializerTest { + public void testSerde(NonAggrCounter counter, NonAggrMeasureSerializer serializer) { + counter.add(1); + counter.add(3333); + counter.add("123".getBytes()); + counter.add(456); + + long count = counter.getCountValue(); + ByteBuffer buffer = ByteBuffer.allocate(10 * 1024 * 1024); + serializer.serialize(counter, buffer); + int len = buffer.position(); + + buffer.position(0); + NonAggrCounter one = serializer.deserialize(buffer); + assertTrue(one.finalCount == NonAggrCounter.INVALID_VALUE); + assertTrue(one.status == NonAggrStatus.INTERMEDIATE); +// System.out.println("new count : " + one.getCountValue()); + assertTrue(count == one.getCountValue()); + + buffer.position(0); + assertTrue(len == serializer.peekLength(buffer)); + } + + @Test + public void testBitmapSerde() { + NonAggrCounter counter = new NonAggrBitmapCounter(); + counter.add(1000); + counter.add(111); + NonAggrMeasureSerializer serializer = new NonAggrMeasureSerializer.NonAggrBitmapMeasureSerializer(DataType.getType("nbitmap")); + + testSerde(counter, serializer); + } + + @Test + public void testHllcSerde() { + NonAggrCounter counter = new NonAggrHLLCounter(14); + counter.add("hello"); + counter.add("world"); + NonAggrMeasureSerializer serializer = new NonAggrMeasureSerializer.NonAggrHLLCMeasureSerializer(DataType.getType("nhllc14")); + + testSerde(counter, serializer); + } +} diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java index 490031e..d54d171 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java @@ -62,6 +62,9 @@ public class KeyValueCreator { for (int i = 0; i < measures.size(); i++) { if (refIndex.length <= i || refIndex[i] != i) isFullCopy = false; + if(measures.get(i).getFunction().getReturnDataType().isNonAggrType()) { + isFullCopy = false; + } } } @@ -74,8 +77,8 @@ public class KeyValueCreator { colValues[i] = measureValues[refIndex[i]]; } - ByteBuffer valueBuf = codec.encode(colValues); - +// ByteBuffer valueBuf = codec.encode(colValues); + ByteBuffer valueBuf = codec.flush(colValues); return create(keyBytes, keyOffset, keyLength, valueBuf.array(), 0, valueBuf.position()); } -- 1.7.10.4