diff --git a/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java b/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java index e9fe8fa..133ef0a 100644 --- a/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java +++ b/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java @@ -789,6 +789,15 @@ {"FilterTimestampColumnBetween", ""}, {"FilterTimestampColumnBetween", "!"}, + // This is for runtime min/max pushdown - don't need to do NOT BETWEEN + {"FilterColumnBetweenDynamicValue", "long", ""}, + {"FilterColumnBetweenDynamicValue", "double", ""}, + {"FilterColumnBetweenDynamicValue", "decimal", ""}, + {"FilterColumnBetweenDynamicValue", "string", ""}, + {"FilterColumnBetweenDynamicValue", "char", ""}, + {"FilterColumnBetweenDynamicValue", "varchar", ""}, + {"FilterColumnBetweenDynamicValue", "timestamp", ""}, + {"ColumnCompareColumn", "Equal", "long", "double", "=="}, {"ColumnCompareColumn", "Equal", "double", "double", "=="}, {"ColumnCompareColumn", "NotEqual", "long", "double", "!="}, @@ -1164,6 +1173,8 @@ private void generate() throws Exception { } else if (tdesc[0].equals("FilterColumnBetween")) { generateFilterColumnBetween(tdesc); + } else if (tdesc[0].equals("FilterColumnBetweenDynamicValue")) { + generateFilterColumnBetweenDynamicValue(tdesc); } else if (tdesc[0].equals("ScalarArithmeticColumn") || tdesc[0].equals("ScalarDivideColumn")) { generateScalarArithmeticColumn(tdesc); } else if (tdesc[0].equals("FilterColumnCompareColumn")) { @@ -1379,6 +1390,72 @@ private void generateFilterColumnBetween(String[] tdesc) throws Exception { className, templateString); } + private void generateFilterColumnBetweenDynamicValue(String[] tdesc) throws Exception { + String operandType = tdesc[1]; + String optionalNot = tdesc[2]; + + String className = "Filter" + getCamelCaseType(operandType) + "Column" + + (optionalNot.equals("!") ? "Not" : "") + "BetweenDynamicValue"; + + String typeName = getCamelCaseType(operandType); + String defaultValue; + String vectorType; + String getPrimitiveMethod; + String getValueMethod; + + if (operandType.equals("long")) { + defaultValue = "0"; + vectorType = "long"; + getPrimitiveMethod = "getLong"; + getValueMethod = ""; + } else if (operandType.equals("double")) { + defaultValue = "0"; + vectorType = "double"; + getPrimitiveMethod = "getDouble"; + getValueMethod = ""; + } else if (operandType.equals("decimal")) { + defaultValue = "null"; + vectorType = "HiveDecimal"; + getPrimitiveMethod = "getHiveDecimal"; + getValueMethod = ""; + } else if (operandType.equals("string")) { + defaultValue = "null"; + vectorType = "byte[]"; + getPrimitiveMethod = "getString"; + getValueMethod = ".getBytes()"; + } else if (operandType.equals("char")) { + defaultValue = "null"; + vectorType = "byte[]"; + getPrimitiveMethod = "getHiveChar"; + getValueMethod = ".getStrippedValue().getBytes()"; // Does vectorization use stripped char values? + } else if (operandType.equals("varchar")) { + defaultValue = "null"; + vectorType = "byte[]"; + getPrimitiveMethod = "getHiveVarchar"; + getValueMethod = ".getValue().getBytes()"; + } else if (operandType.equals("timestamp")) { + defaultValue = "null"; + vectorType = "Timestamp"; + getPrimitiveMethod = "getTimestamp"; + getValueMethod = ""; + } else { + throw new IllegalArgumentException("Type " + operandType + " not supported"); + } + + // Read the template into a string, expand it, and write it. + File templateFile = new File(joinPath(this.expressionTemplateDirectory, tdesc[0] + ".txt")); + String templateString = readFile(templateFile); + templateString = templateString.replaceAll("", className); + templateString = templateString.replaceAll("", typeName); + templateString = templateString.replaceAll("", defaultValue); + templateString = templateString.replaceAll("", vectorType); + templateString = templateString.replaceAll("", getPrimitiveMethod); + templateString = templateString.replaceAll("", getValueMethod); + + writeFile(templateFile.lastModified(), expressionOutputDirectory, expressionClassesDirectory, + className, templateString); + } + private void generateColumnCompareColumn(String[] tdesc) throws Exception { String operatorName = tdesc[1]; String operandType1 = tdesc[2]; @@ -3084,6 +3161,12 @@ static String getCamelCaseType(String type) { return "Timestamp"; } else if (type.equals("date")) { return "Date"; + } else if (type.equals("string")) { + return "String"; + } else if (type.equals("char")) { + return "Char"; + } else if (type.equals("varchar")) { + return "VarChar"; } else { return type; } diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index e966959..8b3f589 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -611,6 +611,7 @@ minillaplocal.query.files=acid_globallimit.q,\ vector_udf1.q,\ vectorization_short_regress.q,\ vectorized_dynamic_partition_pruning.q,\ + vectorized_dynamic_semijoin_reduction.q,\ vectorized_ptf.q,\ windowing.q,\ windowing_gby.q,\ diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterColumnBetweenDynamicValue.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterColumnBetweenDynamicValue.txt new file mode 100644 index 0000000..97ab7aa --- /dev/null +++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterColumnBetweenDynamicValue.txt @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.expressions.gen; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.FilterColumnBetween; +import org.apache.hadoop.hive.ql.plan.DynamicValue; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.sql.Timestamp; +import org.apache.hadoop.hive.common.type.HiveDecimal; + +public class extends FilterColumnBetween { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(.class); + + protected DynamicValue leftDynamicValue; + protected DynamicValue rightDynamicValue; + protected transient boolean initialized = false; + protected transient boolean isLeftOrRightNull = false; + + public (int colNum, DynamicValue leftValue, DynamicValue rightValue) { + super(colNum, , ); + this.leftDynamicValue = leftValue; + this.rightDynamicValue = rightValue; + } + + public () { + } + + public DynamicValue getLeftDynamicValue() { + return leftDynamicValue; + } + + public void setLeftDynamicValue(DynamicValue leftValue) { + this.leftDynamicValue = leftValue; + } + + public DynamicValue getRightDynamicValue() { + return rightDynamicValue; + } + + public void getRightDynamicValue(DynamicValue rightValue) { + this.rightDynamicValue = rightValue; + } + + @Override + public void init(Configuration conf) { + super.init(conf); + leftDynamicValue.setConf(conf); + rightDynamicValue.setConf(conf); + } + + @Override + public void evaluate(VectorizedRowBatch batch) { + if (!initialized) { + Object lVal = leftDynamicValue.getValue(); + Object rVal = rightDynamicValue.getValue(); + if (lVal == null || rVal == null) { + isLeftOrRightNull = true; + } else { + min = PrimitiveObjectInspectorUtils.( + lVal, leftDynamicValue.getObjectInspector()); + setLeftValue(min); + + max = PrimitiveObjectInspectorUtils.( + rVal, rightDynamicValue.getObjectInspector()); + setRightValue(max); + } + initialized = true; + } + + // Special case for dynamic values - min/max can be null + if (isLeftOrRightNull) { + // Entire batch is filtered out + batch.size = 0; + } + + super.evaluate(batch); + } +} diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt index d68edfa..62d2254 100644 --- a/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt +++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt @@ -154,6 +154,22 @@ public class extends VectorExpression { return "boolean"; } + public HiveDecimal getLeftValue() { + return leftValue; + } + + public void setLeftValue(HiveDecimal value) { + this.leftValue = value; + } + + public HiveDecimal getRightValue() { + return rightValue; + } + + public void setRightValue(HiveDecimal value) { + this.rightValue = value; + } + @Override public VectorExpressionDescriptor.Descriptor getDescriptor() { return (new VectorExpressionDescriptor.Builder()) diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt index e8049da..16d4aaf 100644 --- a/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt +++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt @@ -161,19 +161,19 @@ public class extends VectorExpression { this.colNum = colNum; } - public byte[] getLeft() { + public byte[] getLeftValue() { return left; } - public void setLeft(byte[] value) { + public void setLeftValue(byte[] value) { this.left = value; } - public byte[] getRight() { + public byte[] getRightValue() { return right; } - public void setRight(byte[] value) { + public void setRightValue(byte[] value) { this.right = value; } diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt index 4298d79..806148f 100644 --- a/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt +++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt @@ -153,6 +153,22 @@ public class extends VectorExpression { return "boolean"; } + public Timestamp getLeftValue() { + return leftValue; + } + + public void setLeftValue(Timestamp value) { + this.leftValue = value; + } + + public Timestamp getRightValue() { + return rightValue; + } + + public void setRightValue(Timestamp value) { + this.rightValue = value; + } + @Override public VectorExpressionDescriptor.Descriptor getDescriptor() { return (new VectorExpressionDescriptor.Builder()) diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt index 94a174d..d350dcb 100644 --- a/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt +++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt @@ -163,19 +163,19 @@ public class extends VectorExpression { this.colNum = colNum; } - public byte[] getLeft() { + public byte[] getLeftValue() { return left; } - public void setLeft(byte[] value) { + public void setLeftValue(byte[] value) { this.left = value; } - public byte[] getRight() { + public byte[] getRightValue() { return right; } - public void setRight(byte[] value) { + public void setRightValue(byte[] value) { this.right = value; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java index 7bbedf6..b7687c5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java @@ -122,9 +122,15 @@ public void init(RegistryConf conf) throws Exception { setValue(runtimeValuesInfo.getDynamicValueIDs().get(colIdx), val); } } - // For now, expecting a single row (min/max, aggregated bloom filter) - if (rowCount != 1) { - throw new IllegalStateException("Expected 1 row from " + inputSourceName + ", got " + rowCount); + // For now, expecting a single row (min/max, aggregated bloom filter), or no rows + if (rowCount == 0) { + LOG.debug("No input rows from " + inputSourceName + ", filling dynamic values with nulls"); + for (int colIdx = 0; colIdx < colExprEvaluators.size(); ++colIdx) { + ExprNodeEvaluator eval = colExprEvaluators.get(colIdx); + setValue(runtimeValuesInfo.getDynamicValueIDs().get(colIdx), null); + } + } else if (rowCount > 1) { + throw new IllegalStateException("Expected 0 or 1 rows from " + inputSourceName + ", got " + rowCount); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java index 217af3f..f4499d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java @@ -183,7 +183,8 @@ public static String getVectorColumnSimpleName(String hiveTypeName) { public enum InputExpressionType { NONE(0), COLUMN(1), - SCALAR(2); + SCALAR(2), + DYNAMICVALUE(3); private final int value; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java index 261246b..2598445 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java @@ -72,6 +72,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { try { heartbeatInterval = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVESENDHEARTBEAT); + + conditionEvaluator.init(hconf); } catch (Throwable e) { throw new HiveException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java index f7fec8f..bb382b1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java @@ -113,6 +113,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { projectedColumns = new int [vExpressions.length]; for (int i = 0; i < projectedColumns.length; i++) { + vExpressions[i].init(hconf); projectedColumns[i] = vExpressions[i].getOutputColumn(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java index c887757..484f615 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java @@ -54,6 +54,8 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFAvgDecimal; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFAvgTimestamp; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFBloomFilter; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFBloomFilterMerge; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCount; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountMerge; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountStar; @@ -97,6 +99,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.udf.SettableUDF; @@ -585,6 +588,8 @@ public VectorExpression getVectorExpression(ExprNodeDesc exprDesc, VectorExpress } else if (exprDesc instanceof ExprNodeConstantDesc) { ve = getConstantVectorExpression(((ExprNodeConstantDesc) exprDesc).getValue(), exprDesc.getTypeInfo(), mode); + } else if (exprDesc instanceof ExprNodeDynamicValueDesc) { + ve = getDynamicValueVectorExpression((ExprNodeDynamicValueDesc) exprDesc, mode); } if (ve == null) { throw new HiveException( @@ -1094,6 +1099,21 @@ private VectorExpression getConstantVectorExpression(Object constantValue, TypeI } } + private VectorExpression getDynamicValueVectorExpression(ExprNodeDynamicValueDesc dynamicValueExpr, + VectorExpressionDescriptor.Mode mode) throws HiveException { + String typeName = dynamicValueExpr.getTypeInfo().getTypeName(); + VectorExpressionDescriptor.ArgumentType vectorArgType = VectorExpressionDescriptor.ArgumentType.fromHiveTypeName(typeName); + if (vectorArgType == VectorExpressionDescriptor.ArgumentType.NONE) { + throw new HiveException("No vector argument type for type name " + typeName); + } + int outCol = -1; + if (mode == VectorExpressionDescriptor.Mode.PROJECTION) { + outCol = ocm.allocateOutputColumn(dynamicValueExpr.getTypeInfo()); + } + + return new DynamicValueVectorExpression(outCol, dynamicValueExpr.getTypeInfo(), dynamicValueExpr.getDynamicValue()); + } + /** * Used as a fast path for operations that don't modify their input, like unary + * and casting boolean to long. IdentityExpression and its children are always @@ -1181,6 +1201,8 @@ private VectorExpression getVectorExpressionForUdf(GenericUDF genericeUdf, builder.setInputExpressionType(i, InputExpressionType.COLUMN); } else if (child instanceof ExprNodeConstantDesc) { builder.setInputExpressionType(i, InputExpressionType.SCALAR); + } else if (child instanceof ExprNodeDynamicValueDesc) { + builder.setInputExpressionType(i, InputExpressionType.DYNAMICVALUE); } else { throw new HiveException("Cannot handle expression type: " + child.getClass().getSimpleName()); } @@ -1225,6 +1247,8 @@ private VectorExpression createVectorExpression(Class vectorClass, } else if (child instanceof ExprNodeConstantDesc) { Object scalarValue = getVectorTypeScalarValue((ExprNodeConstantDesc) child); arguments[i] = (null == scalarValue) ? getConstantVectorExpression(null, child.getTypeInfo(), childrenMode) : scalarValue; + } else if (child instanceof ExprNodeDynamicValueDesc) { + arguments[i] = ((ExprNodeDynamicValueDesc) child).getDynamicValue(); } else { throw new HiveException("Cannot handle expression type: " + child.getClass().getSimpleName()); } @@ -2092,8 +2116,13 @@ private VectorExpression getBetweenFilterExpression(List childExpr return null; } + boolean hasDynamicValues = false; + // We don't currently support the BETWEEN ends being columns. They must be scalars. - if (!(childExpr.get(2) instanceof ExprNodeConstantDesc) || + if ((childExpr.get(2) instanceof ExprNodeDynamicValueDesc) && + (childExpr.get(3) instanceof ExprNodeDynamicValueDesc)) { + hasDynamicValues = true; + } else if (!(childExpr.get(2) instanceof ExprNodeConstantDesc) || !(childExpr.get(3) instanceof ExprNodeConstantDesc)) { return null; } @@ -2138,35 +2167,51 @@ private VectorExpression getBetweenFilterExpression(List childExpr // determine class Class cl = null; if (isIntFamily(colType) && !notKeywordPresent) { - cl = FilterLongColumnBetween.class; + cl = (hasDynamicValues ? + FilterLongColumnBetweenDynamicValue.class : + FilterLongColumnBetween.class); } else if (isIntFamily(colType) && notKeywordPresent) { cl = FilterLongColumnNotBetween.class; } else if (isFloatFamily(colType) && !notKeywordPresent) { - cl = FilterDoubleColumnBetween.class; + cl = (hasDynamicValues ? + FilterDoubleColumnBetweenDynamicValue.class : + FilterDoubleColumnBetween.class); } else if (isFloatFamily(colType) && notKeywordPresent) { cl = FilterDoubleColumnNotBetween.class; } else if (colType.equals("string") && !notKeywordPresent) { - cl = FilterStringColumnBetween.class; + cl = (hasDynamicValues ? + FilterStringColumnBetweenDynamicValue.class : + FilterStringColumnBetween.class); } else if (colType.equals("string") && notKeywordPresent) { cl = FilterStringColumnNotBetween.class; } else if (varcharTypePattern.matcher(colType).matches() && !notKeywordPresent) { - cl = FilterVarCharColumnBetween.class; + cl = (hasDynamicValues ? + FilterVarCharColumnBetweenDynamicValue.class : + FilterVarCharColumnBetween.class); } else if (varcharTypePattern.matcher(colType).matches() && notKeywordPresent) { cl = FilterVarCharColumnNotBetween.class; } else if (charTypePattern.matcher(colType).matches() && !notKeywordPresent) { - cl = FilterCharColumnBetween.class; + cl = (hasDynamicValues ? + FilterCharColumnBetweenDynamicValue.class : + FilterCharColumnBetween.class); } else if (charTypePattern.matcher(colType).matches() && notKeywordPresent) { cl = FilterCharColumnNotBetween.class; } else if (colType.equals("timestamp") && !notKeywordPresent) { - cl = FilterTimestampColumnBetween.class; + cl = (hasDynamicValues ? + FilterTimestampColumnBetweenDynamicValue.class : + FilterTimestampColumnBetween.class); } else if (colType.equals("timestamp") && notKeywordPresent) { cl = FilterTimestampColumnNotBetween.class; } else if (isDecimalFamily(colType) && !notKeywordPresent) { - cl = FilterDecimalColumnBetween.class; + cl = (hasDynamicValues ? + FilterDecimalColumnBetweenDynamicValue.class : + FilterDecimalColumnBetween.class); } else if (isDecimalFamily(colType) && notKeywordPresent) { cl = FilterDecimalColumnNotBetween.class; } else if (isDateFamily(colType) && !notKeywordPresent) { - cl = FilterLongColumnBetween.class; + cl = (hasDynamicValues ? + FilterLongColumnBetweenDynamicValue.class : + FilterLongColumnBetween.class); } else if (isDateFamily(colType) && notKeywordPresent) { cl = FilterLongColumnNotBetween.class; } @@ -2224,6 +2269,12 @@ private VectorExpression getCustomUDFExpression(ExprNodeGenericFuncDesc expr, Ve } else if (child instanceof ExprNodeConstantDesc) { // this is a constant (or null) argDescs[i].setConstant((ExprNodeConstantDesc) child); + } else if (child instanceof ExprNodeDynamicValueDesc) { + VectorExpression e = getVectorExpression(child, VectorExpressionDescriptor.Mode.PROJECTION); + vectorExprs.add(e); + variableArgPositions.add(i); + exprResultColumnNums.add(e.getOutputColumn()); + argDescs[i].setVariable(e.getOutputColumn()); } else { throw new HiveException("Unable to vectorize custom UDF. Encountered unsupported expr desc : " + child); @@ -2651,6 +2702,14 @@ public static String mapTypeNameSynonyms(String typeName) { add(new AggregateDefinition("stddev_samp", ArgumentType.FLOAT_FAMILY, Mode.PARTIAL1, VectorUDAFStdSampDouble.class)); add(new AggregateDefinition("stddev_samp", ArgumentType.DECIMAL, Mode.PARTIAL1, VectorUDAFStdSampDecimal.class)); add(new AggregateDefinition("stddev_samp", ArgumentType.TIMESTAMP, Mode.PARTIAL1, VectorUDAFStdSampTimestamp.class)); + + // UDAFBloomFilter. Original data is one type, partial/final is another, + // so this requires 2 aggregation classes (partial1/complete), (partial2/final) + add(new AggregateDefinition("bloom_filter", ArgumentType.ALL_FAMILY, Mode.PARTIAL1, VectorUDAFBloomFilter.class)); + add(new AggregateDefinition("bloom_filter", ArgumentType.ALL_FAMILY, Mode.COMPLETE, VectorUDAFBloomFilter.class)); + add(new AggregateDefinition("bloom_filter", ArgumentType.BINARY, Mode.PARTIAL2, VectorUDAFBloomFilterMerge.class)); + add(new AggregateDefinition("bloom_filter", ArgumentType.BINARY, Mode.FINAL, VectorUDAFBloomFilterMerge.class)); + }}; public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/DynamicValueVectorExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/DynamicValueVectorExpression.java new file mode 100644 index 0000000..1a34118 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/DynamicValueVectorExpression.java @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.expressions; + +import java.sql.Timestamp; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; +import org.apache.hadoop.hive.ql.exec.vector.*; +import org.apache.hadoop.hive.ql.plan.DynamicValue; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Constant is represented as a vector with repeating values. + */ +public class DynamicValueVectorExpression extends VectorExpression { + private static final Logger LOG = LoggerFactory.getLogger(DynamicValueVectorExpression.class); + + private static final long serialVersionUID = 1L; + + DynamicValue dynamicValue; + TypeInfo typeInfo; + transient private boolean initialized = false; + + private int outputColumn; + protected long longValue = 0; + private double doubleValue = 0; + private byte[] bytesValue = null; + private HiveDecimal decimalValue = null; + private Timestamp timestampValue = null; + private HiveIntervalDayTime intervalDayTimeValue = null; + private boolean isNullValue = false; + + private ColumnVector.Type type; + private int bytesValueLength = 0; + + public DynamicValueVectorExpression() { + super(); + } + + public DynamicValueVectorExpression(int outputColumn, TypeInfo typeInfo, DynamicValue dynamicValue) { + this(); + this.outputColumn = outputColumn; + this.type = VectorizationContext.getColumnVectorTypeFromTypeInfo(typeInfo); + this.dynamicValue = dynamicValue; + this.typeInfo = typeInfo; + } + + private void evaluateLong(VectorizedRowBatch vrg) { + LongColumnVector cv = (LongColumnVector) vrg.cols[outputColumn]; + cv.isRepeating = true; + cv.noNulls = !isNullValue; + if (!isNullValue) { + cv.vector[0] = longValue; + cv.isNull[0] = false; + } else { + cv.isNull[0] = true; + } + } + + private void evaluateDouble(VectorizedRowBatch vrg) { + DoubleColumnVector cv = (DoubleColumnVector) vrg.cols[outputColumn]; + cv.isRepeating = true; + cv.noNulls = !isNullValue; + if (!isNullValue) { + cv.vector[0] = doubleValue; + cv.isNull[0] = false; + } else { + cv.isNull[0] = true; + } + } + + private void evaluateBytes(VectorizedRowBatch vrg) { + BytesColumnVector cv = (BytesColumnVector) vrg.cols[outputColumn]; + cv.isRepeating = true; + cv.noNulls = !isNullValue; + cv.initBuffer(); + if (!isNullValue) { + cv.setVal(0, bytesValue, 0, bytesValueLength); + cv.isNull[0] = false; + } else { + cv.isNull[0] = true; + } + } + + private void evaluateDecimal(VectorizedRowBatch vrg) { + DecimalColumnVector dcv = (DecimalColumnVector) vrg.cols[outputColumn]; + dcv.isRepeating = true; + dcv.noNulls = !isNullValue; + if (!isNullValue) { + dcv.vector[0].set(decimalValue); + dcv.isNull[0] = false; + } else { + dcv.isNull[0] = true; + } + } + + private void evaluateTimestamp(VectorizedRowBatch vrg) { + TimestampColumnVector dcv = (TimestampColumnVector) vrg.cols[outputColumn]; + dcv.isRepeating = true; + dcv.noNulls = !isNullValue; + if (!isNullValue) { + dcv.set(0, timestampValue); + dcv.isNull[0] = false; + } else { + dcv.isNull[0] = true; + } + } + + private void evaluateIntervalDayTime(VectorizedRowBatch vrg) { + IntervalDayTimeColumnVector dcv = (IntervalDayTimeColumnVector) vrg.cols[outputColumn]; + dcv.isRepeating = true; + dcv.noNulls = !isNullValue; + if (!isNullValue) { + dcv.set(0, intervalDayTimeValue); + dcv.isNull[0] = false; + } else { + dcv.isNull[0] = true; + } + } + + private void initValue() { + Object val = dynamicValue.getValue(); + + if (val == null) { + isNullValue = true; + } else { + PrimitiveObjectInspector poi = dynamicValue.getObjectInspector(); + byte[] bytesVal; + switch (poi.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + longValue = PrimitiveObjectInspectorUtils.getLong(val, poi); + break; + case FLOAT: + case DOUBLE: + doubleValue = PrimitiveObjectInspectorUtils.getDouble(val, poi); + break; + case STRING: + case CHAR: + case VARCHAR: + bytesVal = PrimitiveObjectInspectorUtils.getString(val, poi).getBytes(); + setBytesValue(bytesVal); + break; + case BINARY: + bytesVal = PrimitiveObjectInspectorUtils.getBinary(val, poi).copyBytes(); + setBytesValue(bytesVal); + break; + case DECIMAL: + decimalValue = PrimitiveObjectInspectorUtils.getHiveDecimal(val, poi); + break; + case DATE: + longValue = DateWritable.dateToDays(PrimitiveObjectInspectorUtils.getDate(val, poi)); + case TIMESTAMP: + timestampValue = PrimitiveObjectInspectorUtils.getTimestamp(val, poi); + break; + case INTERVAL_YEAR_MONTH: + longValue = PrimitiveObjectInspectorUtils.getHiveIntervalYearMonth(val, poi).getTotalMonths(); + break; + case INTERVAL_DAY_TIME: + intervalDayTimeValue = PrimitiveObjectInspectorUtils.getHiveIntervalDayTime(val, poi); + break; + default: + throw new IllegalStateException("Unsupported type " + poi.getPrimitiveCategory()); + } + } + + initialized = true; + } + + @Override + public void init(Configuration conf) { + super.init(conf); + dynamicValue.setConf(conf); + } + + @Override + public void evaluate(VectorizedRowBatch vrg) { + if (!initialized) { + initValue(); + } + + switch (type) { + case LONG: + evaluateLong(vrg); + break; + case DOUBLE: + evaluateDouble(vrg); + break; + case BYTES: + evaluateBytes(vrg); + break; + case DECIMAL: + evaluateDecimal(vrg); + break; + case TIMESTAMP: + evaluateTimestamp(vrg); + break; + case INTERVAL_DAY_TIME: + evaluateIntervalDayTime(vrg); + break; + default: + throw new IllegalStateException("Unsupported type " + type); + } + } + + @Override + public int getOutputColumn() { + return outputColumn; + } + + public long getLongValue() { + return longValue; + } + + public void setLongValue(long longValue) { + this.longValue = longValue; + } + + public double getDoubleValue() { + return doubleValue; + } + + public void setDoubleValue(double doubleValue) { + this.doubleValue = doubleValue; + } + + public byte[] getBytesValue() { + return bytesValue; + } + + public void setBytesValue(byte[] bytesValue) { + this.bytesValue = bytesValue.clone(); + this.bytesValueLength = bytesValue.length; + } + + public void setDecimalValue(HiveDecimal decimalValue) { + this.decimalValue = decimalValue; + } + + public HiveDecimal getDecimalValue() { + return decimalValue; + } + + public void setTimestampValue(Timestamp timestampValue) { + this.timestampValue = timestampValue; + } + + public Timestamp getTimestampValue() { + return timestampValue; + } + + public void setIntervalDayTimeValue(HiveIntervalDayTime intervalDayTimeValue) { + this.intervalDayTimeValue = intervalDayTimeValue; + } + + public HiveIntervalDayTime getIntervalDayTimeValue() { + return intervalDayTimeValue; + } + + public String getTypeString() { + return getOutputType(); + } + + public void setOutputColumn(int outputColumn) { + this.outputColumn = outputColumn; + } + + @Override + public VectorExpressionDescriptor.Descriptor getDescriptor() { + return (new VectorExpressionDescriptor.Builder()).build(); + } + + public DynamicValue getDynamicValue() { + return dynamicValue; + } + + public void setDynamicValue(DynamicValue dynamicValue) { + this.dynamicValue = dynamicValue; + } + + public TypeInfo getTypeInfo() { + return typeInfo; + } + + public void setTypeInfo(TypeInfo typeInfo) { + this.typeInfo = typeInfo; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java index 8fca8a1..218f306 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java @@ -22,6 +22,8 @@ import java.util.Map; import com.google.common.collect.ImmutableMap; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; @@ -31,7 +33,7 @@ public abstract class VectorExpression implements Serializable { public enum Type { STRING, CHAR, VARCHAR, TIMESTAMP, DATE, LONG, DOUBLE, DECIMAL, - INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME, OTHER; + INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME, BINARY, OTHER; private static Map types = ImmutableMap.builder() .put("string", STRING) .put("char", CHAR) @@ -43,6 +45,7 @@ .put("decimal", DECIMAL) .put("interval_year_month", INTERVAL_YEAR_MONTH) .put("interval_day_time", INTERVAL_DAY_TIME) + .put("binary", BINARY) .build(); public static Type getValue(String name) { @@ -76,6 +79,14 @@ public static Type getValue(String name) { */ public abstract void evaluate(VectorizedRowBatch batch); + public void init(Configuration conf) { + if (childExpressions != null) { + for (VectorExpression child : childExpressions) { + child.init(conf); + } + } + } + /** * Returns the index of the output column in the array * of column vectors. If not applicable, -1 is returned. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorInBloomFilterColDynamicValue.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorInBloomFilterColDynamicValue.java new file mode 100644 index 0000000..188a87e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorInBloomFilterColDynamicValue.java @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.expressions; + +import java.io.ByteArrayInputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor; +import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.plan.DynamicValue; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; +import org.apache.hive.common.util.BloomFilter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class VectorInBloomFilterColDynamicValue extends VectorExpression { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(VectorInBloomFilterColDynamicValue.class); + + protected int colNum; + protected DynamicValue bloomFilterDynamicValue; + protected transient boolean initialized = false; + protected transient BloomFilter bloomFilter; + protected transient BloomFilterCheck bfCheck; + + public VectorInBloomFilterColDynamicValue(int colNum, DynamicValue bloomFilterDynamicValue) { + this.colNum = colNum; + this.bloomFilterDynamicValue = bloomFilterDynamicValue; + } + + public VectorInBloomFilterColDynamicValue() { + } + + @Override + public void init(Configuration conf) { + super.init(conf); + bloomFilterDynamicValue.setConf(conf); + + // Instantiate BloomFilterCheck based on input column type + VectorExpression.Type colType = this.getInputTypes()[0]; + switch (colType) { + case LONG: + case DATE: + bfCheck = new LongBloomFilterCheck(); + break; + case DOUBLE: + bfCheck = new DoubleBloomFilterCheck(); + break; + case DECIMAL: + bfCheck = new DecimalBloomFilterCheck(); + break; + case STRING: + case CHAR: + case VARCHAR: + case BINARY: + bfCheck = new BytesBloomFilterCheck(); + break; + case TIMESTAMP: + bfCheck = new TimestampBloomFilterCheck(); + break; + default: + throw new IllegalStateException("Unsupported type " + colType); + } + } + + private void initValue() { + try { + Object val = bloomFilterDynamicValue.getValue(); + if (val != null) { + BinaryObjectInspector boi = (BinaryObjectInspector) bloomFilterDynamicValue.getObjectInspector(); + byte[] bytes = boi.getPrimitiveJavaObject(val); + bloomFilter = BloomFilter.deserialize(new ByteArrayInputStream(bytes)); + } else { + bloomFilter = null; + } + initialized = true; + } catch (Exception err) { + throw new RuntimeException(err); + } + } + + @Override + public void evaluate(VectorizedRowBatch batch) { + if (childExpressions != null) { + super.evaluateChildren(batch); + } + + if (!initialized) { + initValue(); + } + + ColumnVector inputColVector = batch.cols[colNum]; + int[] sel = batch.selected; + boolean[] nullPos = inputColVector.isNull; + int n = batch.size; + + // return immediately if batch is empty + if (n == 0) { + return; + } + + // In case the dynamic value resolves to a null value + if (bloomFilter == null) { + batch.size = 0; + } + + if (inputColVector.noNulls) { + if (inputColVector.isRepeating) { + + // All must be selected otherwise size would be zero. Repeating property will not change. + if (!(bfCheck.checkValue(inputColVector, 0))) { + + //Entire batch is filtered out. + batch.size = 0; + } + } else if (batch.selectedInUse) { + int newSize = 0; + for(int j=0; j != n; j++) { + int i = sel[j]; + if (bfCheck.checkValue(inputColVector, i)) { + sel[newSize++] = i; + } + } + batch.size = newSize; + } else { + int newSize = 0; + for(int i = 0; i != n; i++) { + if (bfCheck.checkValue(inputColVector, i)) { + sel[newSize++] = i; + } + } + if (newSize < n) { + batch.size = newSize; + batch.selectedInUse = true; + } + } + } else { + if (inputColVector.isRepeating) { + + // All must be selected otherwise size would be zero. Repeating property will not change. + if (!nullPos[0]) { + if (!(bfCheck.checkValue(inputColVector, 0))) { + + //Entire batch is filtered out. + batch.size = 0; + } + } else { + batch.size = 0; + } + } else if (batch.selectedInUse) { + int newSize = 0; + for(int j=0; j != n; j++) { + int i = sel[j]; + if (!nullPos[i]) { + if (bfCheck.checkValue(inputColVector, i)) { + sel[newSize++] = i; + } + } + } + + //Change the selected vector + batch.size = newSize; + } else { + int newSize = 0; + for(int i = 0; i != n; i++) { + if (!nullPos[i]) { + if (bfCheck.checkValue(inputColVector, i)) { + sel[newSize++] = i; + } + } + } + if (newSize < n) { + batch.size = newSize; + batch.selectedInUse = true; + } + } + } + } + + @Override + public int getOutputColumn() { + return -1; + } + + @Override + public String getOutputType() { + return "boolean"; + } + + public int getColNum() { + return colNum; + } + + public void setColNum(int colNum) { + this.colNum = colNum; + } + + @Override + public Descriptor getDescriptor() { + VectorExpressionDescriptor.Builder b = new VectorExpressionDescriptor.Builder(); + b.setMode(VectorExpressionDescriptor.Mode.FILTER) + .setNumArguments(2) + .setArgumentTypes( + VectorExpressionDescriptor.ArgumentType.ALL_FAMILY, + VectorExpressionDescriptor.ArgumentType.BINARY) + .setInputExpressionTypes( + VectorExpressionDescriptor.InputExpressionType.COLUMN, + VectorExpressionDescriptor.InputExpressionType.DYNAMICVALUE); + return b.build(); + } + + // Type-specific handling + abstract class BloomFilterCheck { + abstract public boolean checkValue(ColumnVector columnVector, int idx); + } + + class BytesBloomFilterCheck extends BloomFilterCheck { + @Override + public boolean checkValue(ColumnVector columnVector, int idx) { + BytesColumnVector col = (BytesColumnVector) columnVector; + return bloomFilter.testBytes(col.vector[idx], col.start[idx], col.length[idx]); + } + } + + class LongBloomFilterCheck extends BloomFilterCheck { + @Override + public boolean checkValue(ColumnVector columnVector, int idx) { + LongColumnVector col = (LongColumnVector) columnVector; + return bloomFilter.testLong(col.vector[idx]); + } + } + + class DoubleBloomFilterCheck extends BloomFilterCheck { + @Override + public boolean checkValue(ColumnVector columnVector, int idx) { + DoubleColumnVector col = (DoubleColumnVector) columnVector; + return bloomFilter.testDouble(col.vector[idx]); + } + } + + class DecimalBloomFilterCheck extends BloomFilterCheck { + private byte[] scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES]; + + @Override + public boolean checkValue(ColumnVector columnVector, int idx) { + DecimalColumnVector col = (DecimalColumnVector) columnVector; + int startIdx = col.vector[idx].toBytes(scratchBuffer); + return bloomFilter.testBytes(scratchBuffer, startIdx, scratchBuffer.length - startIdx); + } + } + + class TimestampBloomFilterCheck extends BloomFilterCheck { + @Override + public boolean checkValue(ColumnVector columnVector, int idx) { + TimestampColumnVector col = (TimestampColumnVector) columnVector; + return bloomFilter.testLong(col.time[idx]); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java new file mode 100644 index 0000000..3ecb82e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java @@ -0,0 +1,474 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.AggregationDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator; +import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hive.common.util.BloomFilter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class VectorUDAFBloomFilter extends VectorAggregateExpression { + + private static final Logger LOG = LoggerFactory.getLogger(VectorUDAFBloomFilter.class); + + private static final long serialVersionUID = 1L; + + private VectorExpression inputExpression; + private long expectedEntries = -1; + private ValueProcessor valueProcessor; + transient private int bitSetSize = -1; + transient private BytesWritable bw = new BytesWritable(); + transient private ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + + /** + * class for storing the current aggregate value. + */ + private static final class Aggregation implements AggregationBuffer { + private static final long serialVersionUID = 1L; + + BloomFilter bf; + + public Aggregation(long expectedEntries) { + bf = new BloomFilter(expectedEntries); + } + + @Override + public int getVariableSize() { + throw new UnsupportedOperationException(); + } + + @Override + public void reset() { + bf.reset(); + } + } + + public VectorUDAFBloomFilter(VectorExpression inputExpression) { + this(); + this.inputExpression = inputExpression; + + // Instantiate the ValueProcessor based on the input type + VectorExpressionDescriptor.ArgumentType inputType = + VectorExpressionDescriptor.ArgumentType.fromHiveTypeName(inputExpression.getOutputType()); + switch (inputType) { + case INT_FAMILY: + case DATE: + valueProcessor = new ValueProcessorLong(); + break; + case FLOAT_FAMILY: + valueProcessor = new ValueProcessorDouble(); + break; + case DECIMAL: + valueProcessor = new ValueProcessorDecimal(); + break; + case STRING: + case CHAR: + case VARCHAR: + case STRING_FAMILY: + case BINARY: + valueProcessor = new ValueProcessorBytes(); + break; + case TIMESTAMP: + valueProcessor = new ValueProcessorTimestamp(); + break; + default: + throw new IllegalStateException("Unsupported type " + inputType); + } + } + + public VectorUDAFBloomFilter() { + super(); + } + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + if (expectedEntries < 0) { + throw new IllegalStateException("expectedEntries not initialized"); + } + return new Aggregation(expectedEntries); + } + + @Override + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) + throws HiveException { + + inputExpression.evaluate(batch); + + ColumnVector inputColumn = batch.cols[this.inputExpression.getOutputColumn()]; + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + Aggregation myagg = (Aggregation) agg; + + if (inputColumn.isRepeating) { + if (inputColumn.noNulls) { + valueProcessor.processValue(myagg, inputColumn, 0); + } + return; + } + + if (!batch.selectedInUse && inputColumn.noNulls) { + iterateNoSelectionNoNulls(myagg, inputColumn, batchSize); + } + else if (!batch.selectedInUse) { + iterateNoSelectionHasNulls(myagg, inputColumn, batchSize); + } + else if (inputColumn.noNulls){ + iterateSelectionNoNulls(myagg, inputColumn, batchSize, batch.selected); + } + else { + iterateSelectionHasNulls(myagg, inputColumn, batchSize, batch.selected); + } + } + + private void iterateNoSelectionNoNulls( + Aggregation myagg, + ColumnVector inputColumn, + int batchSize) { + for (int i=0; i< batchSize; ++i) { + valueProcessor.processValue(myagg, inputColumn, i); + } + } + + private void iterateNoSelectionHasNulls( + Aggregation myagg, + ColumnVector inputColumn, + int batchSize) { + + for (int i=0; i< batchSize; ++i) { + if (!inputColumn.isNull[i]) { + valueProcessor.processValue(myagg, inputColumn, i); + } + } + } + + private void iterateSelectionNoNulls( + Aggregation myagg, + ColumnVector inputColumn, + int batchSize, + int[] selected) { + + for (int j=0; j< batchSize; ++j) { + int i = selected[j]; + valueProcessor.processValue(myagg, inputColumn, i); + } + } + + private void iterateSelectionHasNulls( + Aggregation myagg, + ColumnVector inputColumn, + int batchSize, + int[] selected) { + + for (int j=0; j< batchSize; ++j) { + int i = selected[j]; + if (!inputColumn.isNull[i]) { + valueProcessor.processValue(myagg, inputColumn, i); + } + } + } + + @Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, int aggregateIndex, + VectorizedRowBatch batch) throws HiveException { + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + inputExpression.evaluate(batch); + + ColumnVector inputColumn = batch.cols[this.inputExpression.getOutputColumn()]; + + if (inputColumn.noNulls) { + if (inputColumn.isRepeating) { + iterateNoNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + inputColumn, batchSize); + } else { + if (batch.selectedInUse) { + iterateNoNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + inputColumn, batch.selected, batchSize); + } else { + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + inputColumn, batchSize); + } + } + } else { + if (inputColumn.isRepeating) { + // All nulls, no-op for min/max + } else { + if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + inputColumn, batchSize, batch.selected); + } else { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + inputColumn, batchSize); + } + } + } + } + + private void iterateNoNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + ColumnVector inputColumn, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + valueProcessor.processValue(myagg, inputColumn, 0); + } + } + + private void iterateNoNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + ColumnVector inputColumn, + int[] selection, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + int row = selection[i]; + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + valueProcessor.processValue(myagg, inputColumn, row); + } + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + ColumnVector inputColumn, + int batchSize) { + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + valueProcessor.processValue(myagg, inputColumn, i); + } + } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + ColumnVector inputColumn, + int batchSize, + int[] selection) { + + for (int i=0; i < batchSize; ++i) { + int row = selection[i]; + if (!inputColumn.isNull[row]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + valueProcessor.processValue(myagg, inputColumn, i); + } + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + ColumnVector inputColumn, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + if (!inputColumn.isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + valueProcessor.processValue(myagg, inputColumn, i); + } + } + } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex); + return myagg; + } + + @Override + public void reset(AggregationBuffer agg) throws HiveException { + agg.reset(); + } + + @Override + public Object evaluateOutput(AggregationBuffer agg) throws HiveException { + try { + Aggregation bfAgg = (Aggregation) agg; + byteStream.reset(); + BloomFilter.serialize(byteStream, bfAgg.bf); + byte[] bytes = byteStream.toByteArray(); + bw.set(bytes, 0, bytes.length); + return bw; + } catch (IOException err) { + throw new HiveException("Error encountered while serializing bloomfilter", err); + } + } + + @Override + public ObjectInspector getOutputObjectInspector() { + return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector; + } + + @Override + public int getAggregationBufferFixedSize() { + if (bitSetSize < 0) { + // Not pretty, but we need a way to get the size + try { + Aggregation agg = (Aggregation) getNewAggregationBuffer(); + bitSetSize = agg.bf.getBitSet().length; + } catch (Exception e) { + throw new RuntimeException("Unexpected error while creating AggregationBuffer", e); + } + } + + // BloomFilter: object(BitSet: object(data: long[]), numBits: int, numHashFunctions: int) + JavaDataModel model = JavaDataModel.get(); + int bloomFilterSize = JavaDataModel.alignUp(model.object() + model.lengthForLongArrayOfSize(bitSetSize), + model.memoryAlign()); + return JavaDataModel.alignUp( + model.object() + bloomFilterSize + model.primitive1() + model.primitive1(), + model.memoryAlign()); + } + + @Override + public void init(AggregationDesc desc) throws HiveException { + GenericUDAFBloomFilterEvaluator udafBloomFilter = + (GenericUDAFBloomFilterEvaluator) desc.getGenericUDAFEvaluator(); + expectedEntries = udafBloomFilter.getExpectedEntries(); + } + + public VectorExpression getInputExpression() { + return inputExpression; + } + + public void setInputExpression(VectorExpression inputExpression) { + this.inputExpression = inputExpression; + } + + public long getExpectedEntries() { + return expectedEntries; + } + + public void setExpectedEntries(long expectedEntries) { + this.expectedEntries = expectedEntries; + } + + // Type-specific handling done here + private static abstract class ValueProcessor { + abstract protected void processValue(Aggregation myagg, ColumnVector inputColumn, int index); + } + + // + // Type-specific implementations + // + + public static class ValueProcessorBytes extends ValueProcessor { + @Override + protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) { + BytesColumnVector inputColumn = (BytesColumnVector) columnVector; + myagg.bf.addBytes(inputColumn.vector[i], inputColumn.start[i], inputColumn.length[i]); + } + } + + public static class ValueProcessorLong extends ValueProcessor { + @Override + protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) { + LongColumnVector inputColumn = (LongColumnVector) columnVector; + myagg.bf.addLong(inputColumn.vector[i]); + } + } + + public static class ValueProcessorDouble extends ValueProcessor { + @Override + protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) { + DoubleColumnVector inputColumn = (DoubleColumnVector) columnVector; + myagg.bf.addDouble(inputColumn.vector[i]); + } + } + + public static class ValueProcessorDecimal extends ValueProcessor { + private byte[] scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES]; + + @Override + protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) { + DecimalColumnVector inputColumn = (DecimalColumnVector) columnVector; + int startIdx = inputColumn.vector[i].toBytes(scratchBuffer); + myagg.bf.addBytes(scratchBuffer, startIdx, scratchBuffer.length - startIdx); + } + } + + public static class ValueProcessorTimestamp extends ValueProcessor { + @Override + protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) { + TimestampColumnVector inputColumn = (TimestampColumnVector) columnVector; + myagg.bf.addLong(inputColumn.time[i]); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java new file mode 100644 index 0000000..ad190b7 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java @@ -0,0 +1,365 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates; + +import java.io.ByteArrayOutputStream; +import java.util.Arrays; + +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.AggregationDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hive.common.util.BloomFilter; + +public class VectorUDAFBloomFilterMerge extends VectorAggregateExpression { + + private static final long serialVersionUID = 1L; + + private VectorExpression inputExpression; + private long expectedEntries = -1; + transient private int aggBufferSize = -1; + transient private BytesWritable bw = new BytesWritable(); + + /** + * class for storing the current aggregate value. + */ + private static final class Aggregation implements AggregationBuffer { + private static final long serialVersionUID = 1L; + + byte[] bfBytes; + + public Aggregation(long expectedEntries) { + try { + BloomFilter bf = new BloomFilter(expectedEntries); + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + BloomFilter.serialize(bytesOut, bf); + bfBytes = bytesOut.toByteArray(); + } catch (Exception err) { + throw new IllegalArgumentException("Error creating aggregation buffer", err); + } + } + + @Override + public int getVariableSize() { + throw new UnsupportedOperationException(); + } + + @Override + public void reset() { + // Do not change the initial bytes which contain NumHashFunctions/NumBits! + Arrays.fill(bfBytes, BloomFilter.START_OF_SERIALIZED_LONGS, bfBytes.length, (byte) 0); + } + } + + public VectorUDAFBloomFilterMerge(VectorExpression inputExpression) { + this(); + this.inputExpression = inputExpression; + } + + public VectorUDAFBloomFilterMerge() { + super(); + } + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + if (expectedEntries < 0) { + throw new IllegalStateException("expectedEntries not initialized"); + } + return new Aggregation(expectedEntries); + } + + @Override + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) + throws HiveException { + + inputExpression.evaluate(batch); + + ColumnVector inputColumn = batch.cols[this.inputExpression.getOutputColumn()]; + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + Aggregation myagg = (Aggregation) agg; + + if (inputColumn.isRepeating) { + if (inputColumn.noNulls) { + processValue(myagg, inputColumn, 0); + } + return; + } + + if (!batch.selectedInUse && inputColumn.noNulls) { + iterateNoSelectionNoNulls(myagg, inputColumn, batchSize); + } + else if (!batch.selectedInUse) { + iterateNoSelectionHasNulls(myagg, inputColumn, batchSize); + } + else if (inputColumn.noNulls){ + iterateSelectionNoNulls(myagg, inputColumn, batchSize, batch.selected); + } + else { + iterateSelectionHasNulls(myagg, inputColumn, batchSize, batch.selected); + } + } + + private void iterateNoSelectionNoNulls( + Aggregation myagg, + ColumnVector inputColumn, + int batchSize) { + for (int i=0; i< batchSize; ++i) { + processValue(myagg, inputColumn, i); + } + } + + private void iterateNoSelectionHasNulls( + Aggregation myagg, + ColumnVector inputColumn, + int batchSize) { + + for (int i=0; i< batchSize; ++i) { + if (!inputColumn.isNull[i]) { + processValue(myagg, inputColumn, i); + } + } + } + + private void iterateSelectionNoNulls( + Aggregation myagg, + ColumnVector inputColumn, + int batchSize, + int[] selected) { + + for (int j=0; j< batchSize; ++j) { + int i = selected[j]; + processValue(myagg, inputColumn, i); + } + } + + private void iterateSelectionHasNulls( + Aggregation myagg, + ColumnVector inputColumn, + int batchSize, + int[] selected) { + + for (int j=0; j< batchSize; ++j) { + int i = selected[j]; + if (!inputColumn.isNull[i]) { + processValue(myagg, inputColumn, i); + } + } + } + + @Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, int aggregateIndex, + VectorizedRowBatch batch) throws HiveException { + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + inputExpression.evaluate(batch); + + ColumnVector inputColumn = batch.cols[this.inputExpression.getOutputColumn()]; + + if (inputColumn.noNulls) { + if (inputColumn.isRepeating) { + iterateNoNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + inputColumn, batchSize); + } else { + if (batch.selectedInUse) { + iterateNoNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + inputColumn, batch.selected, batchSize); + } else { + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + inputColumn, batchSize); + } + } + } else { + if (inputColumn.isRepeating) { + // All nulls, no-op for min/max + } else { + if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + inputColumn, batchSize, batch.selected); + } else { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + inputColumn, batchSize); + } + } + } + } + + private void iterateNoNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + ColumnVector inputColumn, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + processValue(myagg, inputColumn, 0); + } + } + + private void iterateNoNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + ColumnVector inputColumn, + int[] selection, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + int row = selection[i]; + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + processValue(myagg, inputColumn, row); + } + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + ColumnVector inputColumn, + int batchSize) { + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + processValue(myagg, inputColumn, i); + } + } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + ColumnVector inputColumn, + int batchSize, + int[] selection) { + + for (int i=0; i < batchSize; ++i) { + int row = selection[i]; + if (!inputColumn.isNull[row]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + processValue(myagg, inputColumn, i); + } + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + ColumnVector inputColumn, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + if (!inputColumn.isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + processValue(myagg, inputColumn, i); + } + } + } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex); + return myagg; + } + + @Override + public void reset(AggregationBuffer agg) throws HiveException { + agg.reset(); + } + + @Override + public Object evaluateOutput(AggregationBuffer agg) throws HiveException { + Aggregation bfAgg = (Aggregation) agg; + bw.set(bfAgg.bfBytes, 0, bfAgg.bfBytes.length); + return bw; + } + + @Override + public ObjectInspector getOutputObjectInspector() { + return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector; + } + + @Override + public int getAggregationBufferFixedSize() { + if (aggBufferSize < 0) { + // Not pretty, but we need a way to get the size + try { + Aggregation agg = (Aggregation) getNewAggregationBuffer(); + aggBufferSize = agg.bfBytes.length; + } catch (Exception e) { + throw new RuntimeException("Unexpected error while creating AggregationBuffer", e); + } + } + + return aggBufferSize; + } + + @Override + public void init(AggregationDesc desc) throws HiveException { + GenericUDAFBloomFilterEvaluator udafBloomFilter = + (GenericUDAFBloomFilterEvaluator) desc.getGenericUDAFEvaluator(); + expectedEntries = udafBloomFilter.getExpectedEntries(); + } + + void processValue(Aggregation myagg, ColumnVector columnVector, int i) { + // columnVector entry is byte array representing serialized BloomFilter. + // BloomFilter.mergeBloomFilterBytes() does a simple byte ORing + // which should be faster than deserialize/merge. + BytesColumnVector inputColumn = (BytesColumnVector) columnVector; + BloomFilter.mergeBloomFilterBytes(myagg.bfBytes, 0, myagg.bfBytes.length, + inputColumn.vector[i], inputColumn.start[i], inputColumn.length[i]); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index e3d9d7f..439950b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -335,6 +335,7 @@ public Vectorizer() { supportedGenericUDFs.add(GenericUDFNvl.class); supportedGenericUDFs.add(GenericUDFElt.class); supportedGenericUDFs.add(GenericUDFInitCap.class); + supportedGenericUDFs.add(GenericUDFInBloomFilter.class); // For type casts supportedGenericUDFs.add(UDFToLong.class); @@ -368,6 +369,7 @@ public Vectorizer() { supportedAggregationUdfs.add("stddev"); supportedAggregationUdfs.add("stddev_pop"); supportedAggregationUdfs.add("stddev_samp"); + supportedAggregationUdfs.add("bloom_filter"); } private class VectorTaskColumnInfo { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java index fb9a140..deb0f76 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.*; @@ -72,6 +73,8 @@ public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticE // Bloom filter rest private ByteArrayOutputStream result = new ByteArrayOutputStream(); + private transient byte[] scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES]; + @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters); @@ -167,9 +170,10 @@ public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveExcep bf.addDouble(vDouble); break; case DECIMAL: - HiveDecimal vDecimal = ((HiveDecimalObjectInspector)inputOI). - getPrimitiveJavaObject(parameters[0]); - bf.addString(vDecimal.toString()); + HiveDecimalWritable vDecimal = ((HiveDecimalObjectInspector)inputOI). + getPrimitiveWritableObject(parameters[0]); + int startIdx = vDecimal.toBytes(scratchBuffer); + bf.addBytes(scratchBuffer, startIdx, scratchBuffer.length - startIdx); break; case DATE: DateWritable vDate = ((DateObjectInspector)inputOI). diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java index 1b7de6c..3e6e069 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java @@ -22,8 +22,11 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedExpressions; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorInBloomFilterColDynamicValue; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.*; @@ -41,6 +44,7 @@ /** * GenericUDF to lookup a value in BloomFilter */ +@VectorizedExpressions({VectorInBloomFilterColDynamicValue.class}) public class GenericUDFInBloomFilter extends GenericUDF { private static final Logger LOG = LoggerFactory.getLogger(GenericUDFInBloomFilter.class); @@ -48,6 +52,7 @@ private transient ObjectInspector bloomFilterObjectInspector; private transient BloomFilter bloomFilter; private transient boolean initializedBloomFilter; + private transient byte[] scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES]; @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { @@ -133,9 +138,10 @@ public Object evaluate(DeferredObject[] arguments) throws HiveException { get(arguments[0].get()); return bloomFilter.testDouble(vDouble); case DECIMAL: - HiveDecimal vDecimal = ((HiveDecimalObjectInspector) valObjectInspector). - getPrimitiveJavaObject(arguments[0].get()); - return bloomFilter.testString(vDecimal.toString()); + HiveDecimalWritable vDecimal = ((HiveDecimalObjectInspector) valObjectInspector). + getPrimitiveWritableObject(arguments[0].get()); + int startIdx = vDecimal.toBytes(scratchBuffer); + return bloomFilter.testBytes(scratchBuffer, startIdx, scratchBuffer.length - startIdx); case DATE: DateWritable vDate = ((DateObjectInspector) valObjectInspector). getPrimitiveWritableObject(arguments[0].get()); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java index 59cb31e..889f00a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.*; import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor; +import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Mode; import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFSumLong; @@ -218,4 +219,34 @@ public void testValidateSMBJoinOperator() { Vectorizer vectorizer = new Vectorizer(); Assert.assertTrue(vectorizer.validateMapWorkOperator(map, null, false)); } + + @Test + public void testExprNodeDynamicValue() { + ExprNodeDesc exprNode = new ExprNodeDynamicValueDesc(new DynamicValue("id1", TypeInfoFactory.stringTypeInfo)); + Vectorizer v = new Vectorizer(); + Assert.assertTrue(v.validateExprNodeDesc(exprNode, Mode.FILTER)); + Assert.assertTrue(v.validateExprNodeDesc(exprNode, Mode.PROJECTION)); + } + + @Test + public void testExprNodeBetweenWithDynamicValue() { + ExprNodeDesc notBetween = new ExprNodeConstantDesc(TypeInfoFactory.booleanTypeInfo, Boolean.FALSE); + ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(String.class, "col1", "table", false); + ExprNodeDesc minExpr = new ExprNodeDynamicValueDesc(new DynamicValue("id1", TypeInfoFactory.stringTypeInfo)); + ExprNodeDesc maxExpr = new ExprNodeDynamicValueDesc(new DynamicValue("id2", TypeInfoFactory.stringTypeInfo)); + + ExprNodeGenericFuncDesc betweenExpr = new ExprNodeGenericFuncDesc(); + GenericUDF betweenUdf = new GenericUDFBetween(); + betweenExpr.setTypeInfo(TypeInfoFactory.booleanTypeInfo); + betweenExpr.setGenericUDF(betweenUdf); + List children1 = new ArrayList(2); + children1.add(notBetween); + children1.add(colExpr); + children1.add(minExpr); + children1.add(maxExpr); + betweenExpr.setChildren(children1); + + Vectorizer v = new Vectorizer(); + Assert.assertTrue(v.validateExprNodeDesc(betweenExpr, Mode.FILTER)); + } } diff --git a/ql/src/test/queries/clientpositive/vectorized_dynamic_semijoin_reduction.q b/ql/src/test/queries/clientpositive/vectorized_dynamic_semijoin_reduction.q new file mode 100644 index 0000000..e1eefff --- /dev/null +++ b/ql/src/test/queries/clientpositive/vectorized_dynamic_semijoin_reduction.q @@ -0,0 +1,43 @@ +set hive.compute.query.using.stats=false; +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; +set hive.optimize.ppd=true; +set hive.ppd.remove.duplicatefilters=true; +set hive.tez.dynamic.partition.pruning=true; +set hive.tez.dynamic.semijoin.reduction=true; +set hive.optimize.metadataonly=false; +set hive.optimize.index.filter=true; + +set hive.vectorized.adaptor.usage.mode=none; +set hive.vectorized.execution.enabled=true; + +-- Create Tables +create table dsrv_big stored as orc as select key as key_str, cast(key as int) as key_int, value from src; +create table dsrv_small stored as orc as select distinct key as key_str, cast(key as int) as key_int, value from src where key < 100; + +-- single key (int) +EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int); +select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int); + +-- single key (string) +EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str); +select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str); + +-- keys are different type +EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str); +select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_str); + +-- multiple tables +EXPLAIN select count(*) from dsrv_big a, dsrv_small b, dsrv_small c where a.key_int = b.key_int and a.key_int = c.key_int; +select count(*) from dsrv_big a, dsrv_small b, dsrv_small c where a.key_int = b.key_int and a.key_int = c.key_int; + +-- multiple keys +EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str and a.key_int = b.key_int); +select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str and a.key_int = b.key_int); + +-- small table result is empty +EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2'); +select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2'); + +drop table dsrv_big; +drop table dsrv_small; diff --git a/ql/src/test/results/clientpositive/llap/mergejoin.q.out b/ql/src/test/results/clientpositive/llap/mergejoin.q.out index 4ec2a71..6114548 100644 --- a/ql/src/test/results/clientpositive/llap/mergejoin.q.out +++ b/ql/src/test/results/clientpositive/llap/mergejoin.q.out @@ -92,7 +92,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Reducer 4 - Execution mode: llap + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=25) @@ -321,7 +321,7 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Map 5 Map Operator Tree: @@ -341,7 +341,7 @@ STAGE PLANS: sort order: + Map-reduce partition columns: _col0 (type: int) Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Reducer 2 Execution mode: llap @@ -378,7 +378,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Reducer 4 - Execution mode: llap + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242) @@ -1434,7 +1434,7 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Map 5 Map Operator Tree: @@ -1453,7 +1453,7 @@ STAGE PLANS: sort order: + Map-reduce partition columns: _col0 (type: int) Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Reducer 2 Execution mode: llap @@ -1490,7 +1490,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Reducer 4 - Execution mode: llap + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242) @@ -1565,7 +1565,7 @@ STAGE PLANS: sort order: + Map-reduce partition columns: _col0 (type: int) Statistics: Num rows: 242 Data size: 24684 Basic stats: COMPLETE Column stats: NONE - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Map 4 Map Operator Tree: @@ -1594,7 +1594,7 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Reducer 2 Execution mode: llap @@ -1631,7 +1631,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Reducer 5 - Execution mode: llap + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=500) @@ -1831,7 +1831,7 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Map 6 Map Operator Tree: @@ -1851,7 +1851,7 @@ STAGE PLANS: sort order: + Map-reduce partition columns: _col0 (type: int) Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Map 7 Map Operator Tree: @@ -1937,7 +1937,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Reducer 5 - Execution mode: llap + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242) @@ -1949,7 +1949,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) Reducer 8 - Execution mode: llap + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=25) @@ -2034,7 +2034,7 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Map 5 Map Operator Tree: @@ -2054,7 +2054,7 @@ STAGE PLANS: sort order: + Map-reduce partition columns: _col1 (type: string) Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Reducer 2 Execution mode: llap @@ -2091,7 +2091,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Reducer 4 - Execution mode: llap + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242) @@ -2224,7 +2224,7 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Map 9 Map Operator Tree: @@ -2244,7 +2244,7 @@ STAGE PLANS: sort order: + Map-reduce partition columns: _col0 (type: int) Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Reducer 2 Execution mode: llap @@ -2310,7 +2310,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Reducer 6 - Execution mode: llap + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=508) @@ -2380,7 +2380,7 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Map 5 Map Operator Tree: @@ -2400,7 +2400,7 @@ STAGE PLANS: sort order: + Map-reduce partition columns: _col1 (type: string) Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Reducer 2 Execution mode: llap @@ -2437,7 +2437,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Reducer 4 - Execution mode: llap + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242) @@ -2524,7 +2524,7 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Map 6 Map Operator Tree: @@ -2544,7 +2544,7 @@ STAGE PLANS: sort order: + Map-reduce partition columns: _col0 (type: int) Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Map 7 Map Operator Tree: @@ -2630,7 +2630,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Reducer 5 - Execution mode: llap + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242) @@ -2642,7 +2642,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) Reducer 8 - Execution mode: llap + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=25) @@ -2777,7 +2777,7 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Map 9 Map Operator Tree: @@ -2797,7 +2797,7 @@ STAGE PLANS: sort order: + Map-reduce partition columns: _col0 (type: int) Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Reducer 2 Execution mode: llap @@ -2863,7 +2863,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Reducer 6 - Execution mode: llap + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=508) @@ -2954,10 +2954,10 @@ STAGE PLANS: key expressions: _col0 (type: int), _col1 (type: string) sort order: ++ Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Reducer 2 - Execution mode: llap + Execution mode: vectorized, llap Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey0 (type: int) @@ -3016,7 +3016,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Reducer 5 - Execution mode: llap + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242) diff --git a/ql/src/test/results/clientpositive/llap/orc_llap.q.out b/ql/src/test/results/clientpositive/llap/orc_llap.q.out index 90055a5..4fb3d12 100644 --- a/ql/src/test/results/clientpositive/llap/orc_llap.q.out +++ b/ql/src/test/results/clientpositive/llap/orc_llap.q.out @@ -597,7 +597,7 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: smallint), _col1 (type: smallint), _col2 (type: binary) - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Map 5 Map Operator Tree: @@ -618,7 +618,7 @@ STAGE PLANS: Map-reduce partition columns: _col0 (type: smallint) Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE value expressions: _col2 (type: string) - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Reducer 2 Execution mode: llap @@ -660,7 +660,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Reducer 4 - Execution mode: llap + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=122880) @@ -1089,7 +1089,7 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: smallint), _col1 (type: smallint), _col2 (type: binary) - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Map 5 Map Operator Tree: @@ -1110,7 +1110,7 @@ STAGE PLANS: Map-reduce partition columns: _col0 (type: smallint) Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE value expressions: _col2 (type: string) - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Reducer 2 Execution mode: llap @@ -1152,7 +1152,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Reducer 4 - Execution mode: llap + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=245760) diff --git a/ql/src/test/results/clientpositive/llap/vector_binary_join_groupby.q.out b/ql/src/test/results/clientpositive/llap/vector_binary_join_groupby.q.out index 9fbce7d..7de04a7 100644 --- a/ql/src/test/results/clientpositive/llap/vector_binary_join_groupby.q.out +++ b/ql/src/test/results/clientpositive/llap/vector_binary_join_groupby.q.out @@ -151,7 +151,7 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: bigint) - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Map 3 Map Operator Tree: @@ -171,7 +171,7 @@ STAGE PLANS: Map-reduce partition columns: _col10 (type: binary) Statistics: Num rows: 100 Data size: 29638 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: tinyint), _col1 (type: smallint), _col2 (type: int), _col3 (type: bigint), _col4 (type: float), _col5 (type: double), _col6 (type: boolean), _col7 (type: string), _col8 (type: timestamp), _col9 (type: decimal(4,2)) - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Reducer 2 Execution mode: vectorized, llap diff --git a/ql/src/test/results/clientpositive/llap/vectorized_dynamic_partition_pruning.q.out b/ql/src/test/results/clientpositive/llap/vectorized_dynamic_partition_pruning.q.out index 3d087b3..729a84e 100644 --- a/ql/src/test/results/clientpositive/llap/vectorized_dynamic_partition_pruning.q.out +++ b/ql/src/test/results/clientpositive/llap/vectorized_dynamic_partition_pruning.q.out @@ -2958,7 +2958,7 @@ STAGE PLANS: sort order: + Map-reduce partition columns: _col0 (type: string) Statistics: Num rows: 1 Data size: 172 Basic stats: COMPLETE Column stats: NONE - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: all inputs Reducer 2 Execution mode: llap @@ -3024,7 +3024,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Reducer 5 - Execution mode: llap + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=1) diff --git a/ql/src/test/results/clientpositive/llap/vectorized_dynamic_semijoin_reduction.q.out b/ql/src/test/results/clientpositive/llap/vectorized_dynamic_semijoin_reduction.q.out new file mode 100644 index 0000000..29f2391 --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/vectorized_dynamic_semijoin_reduction.q.out @@ -0,0 +1,932 @@ +PREHOOK: query: create table dsrv_big stored as orc as select key as key_str, cast(key as int) as key_int, value from src +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@src +PREHOOK: Output: database:default +PREHOOK: Output: default@dsrv_big +POSTHOOK: query: create table dsrv_big stored as orc as select key as key_str, cast(key as int) as key_int, value from src +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@src +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dsrv_big +POSTHOOK: Lineage: dsrv_big.key_int EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: dsrv_big.key_str SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: dsrv_big.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: create table dsrv_small stored as orc as select distinct key as key_str, cast(key as int) as key_int, value from src where key < 100 +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@src +PREHOOK: Output: database:default +PREHOOK: Output: default@dsrv_small +POSTHOOK: query: create table dsrv_small stored as orc as select distinct key as key_str, cast(key as int) as key_int, value from src where key < 100 +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@src +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dsrv_small +POSTHOOK: Lineage: dsrv_small.key_int EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: dsrv_small.key_str SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: dsrv_small.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Reducer 5 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a + filterExpr: (key_int is not null and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key_int is not null and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key_int (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Execution mode: vectorized, llap + LLAP IO: all inputs + Map 4 + Map Operator Tree: + TableScan + alias: b + filterExpr: key_int is not null (type: boolean) + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key_int is not null (type: boolean) + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key_int (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=57) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) + Execution mode: vectorized, llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + Statistics: Num rows: 550 Data size: 99000 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 3 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=57) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) +PREHOOK: type: QUERY +PREHOOK: Input: default@dsrv_big +PREHOOK: Input: default@dsrv_small +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dsrv_big +POSTHOOK: Input: default@dsrv_small +#### A masked pattern was here #### +84 +PREHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Reducer 5 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a + filterExpr: (key_str is not null and key_str BETWEEN DynamicValue(RS_7_b_key_str_min) AND DynamicValue(RS_7_b_key_str_max) and in_bloom_filter(key_str, DynamicValue(RS_7_b_key_str_bloom_filter))) (type: boolean) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key_str is not null and key_str BETWEEN DynamicValue(RS_7_b_key_str_min) AND DynamicValue(RS_7_b_key_str_max) and in_bloom_filter(key_str, DynamicValue(RS_7_b_key_str_bloom_filter))) (type: boolean) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key_str (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Execution mode: vectorized, llap + LLAP IO: all inputs + Map 4 + Map Operator Tree: + TableScan + alias: b + filterExpr: key_str is not null (type: boolean) + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key_str is not null (type: boolean) + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key_str (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=57) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + Execution mode: vectorized, llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + Statistics: Num rows: 550 Data size: 99000 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 3 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=57) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str) +PREHOOK: type: QUERY +PREHOOK: Input: default@dsrv_big +PREHOOK: Input: default@dsrv_small +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dsrv_big +POSTHOOK: Input: default@dsrv_small +#### A masked pattern was here #### +84 +PREHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Reducer 5 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a + filterExpr: (key_str is not null and key_str BETWEEN DynamicValue(RS_7_b_key_str_min) AND DynamicValue(RS_7_b_key_str_max) and in_bloom_filter(key_str, DynamicValue(RS_7_b_key_str_bloom_filter))) (type: boolean) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key_str is not null and key_str BETWEEN DynamicValue(RS_7_b_key_str_min) AND DynamicValue(RS_7_b_key_str_max) and in_bloom_filter(key_str, DynamicValue(RS_7_b_key_str_bloom_filter))) (type: boolean) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key_str (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Execution mode: vectorized, llap + LLAP IO: all inputs + Map 4 + Map Operator Tree: + TableScan + alias: b + filterExpr: key_str is not null (type: boolean) + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key_str is not null (type: boolean) + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key_str (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=57) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + Execution mode: vectorized, llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + Statistics: Num rows: 550 Data size: 99000 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 3 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=57) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_str) +PREHOOK: type: QUERY +PREHOOK: Input: default@dsrv_big +PREHOOK: Input: default@dsrv_small +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_str) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dsrv_big +POSTHOOK: Input: default@dsrv_small +#### A masked pattern was here #### +84 +PREHOOK: query: EXPLAIN select count(*) from dsrv_big a, dsrv_small b, dsrv_small c where a.key_int = b.key_int and a.key_int = c.key_int +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN select count(*) from dsrv_big a, dsrv_small b, dsrv_small c where a.key_int = b.key_int and a.key_int = c.key_int +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Reducer 5 (BROADCAST_EDGE), Reducer 7 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE) + Reducer 7 <- Map 6 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a + filterExpr: (key_int is not null and key_int BETWEEN DynamicValue(RS_10_b_key_int_min) AND DynamicValue(RS_10_b_key_int_max) and key_int BETWEEN DynamicValue(RS_11_c_key_int_min) AND DynamicValue(RS_11_c_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_10_b_key_int_bloom_filter)) and in_bloom_filter(key_int, DynamicValue(RS_11_c_key_int_bloom_filter))) (type: boolean) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key_int is not null and key_int BETWEEN DynamicValue(RS_10_b_key_int_min) AND DynamicValue(RS_10_b_key_int_max) and key_int BETWEEN DynamicValue(RS_11_c_key_int_min) AND DynamicValue(RS_11_c_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_10_b_key_int_bloom_filter)) and in_bloom_filter(key_int, DynamicValue(RS_11_c_key_int_bloom_filter))) (type: boolean) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key_int (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Execution mode: vectorized, llap + LLAP IO: all inputs + Map 4 + Map Operator Tree: + TableScan + alias: b + filterExpr: key_int is not null (type: boolean) + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key_int is not null (type: boolean) + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key_int (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=57) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) + Execution mode: vectorized, llap + LLAP IO: all inputs + Map 6 + Map Operator Tree: + TableScan + alias: c + filterExpr: key_int is not null (type: boolean) + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key_int is not null (type: boolean) + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key_int (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=57) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) + Execution mode: vectorized, llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + Inner Join 0 to 2 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + 2 _col0 (type: int) + Statistics: Num rows: 1100 Data size: 198000 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 3 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=57) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) + Reducer 7 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=57) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from dsrv_big a, dsrv_small b, dsrv_small c where a.key_int = b.key_int and a.key_int = c.key_int +PREHOOK: type: QUERY +PREHOOK: Input: default@dsrv_big +PREHOOK: Input: default@dsrv_small +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from dsrv_big a, dsrv_small b, dsrv_small c where a.key_int = b.key_int and a.key_int = c.key_int +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dsrv_big +POSTHOOK: Input: default@dsrv_small +#### A masked pattern was here #### +84 +PREHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str and a.key_int = b.key_int) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str and a.key_int = b.key_int) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Reducer 5 (BROADCAST_EDGE), Reducer 6 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE) + Reducer 6 <- Map 4 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a + filterExpr: (key_str is not null and key_int is not null and key_str BETWEEN DynamicValue(RS_7_b_key_str_min) AND DynamicValue(RS_7_b_key_str_max) and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_str, DynamicValue(RS_7_b_key_str_bloom_filter)) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key_str is not null and key_int is not null and key_str BETWEEN DynamicValue(RS_7_b_key_str_min) AND DynamicValue(RS_7_b_key_str_max) and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_str, DynamicValue(RS_7_b_key_str_bloom_filter)) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key_str (type: string), key_int (type: int) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: int) + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: int) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Execution mode: vectorized, llap + LLAP IO: all inputs + Map 4 + Map Operator Tree: + TableScan + alias: b + filterExpr: (key_str is not null and key_int is not null) (type: boolean) + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key_str is not null and key_int is not null) (type: boolean) + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key_str (type: string), key_int (type: int) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: int) + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: int) + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=57) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + Select Operator + expressions: _col1 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=57) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) + Execution mode: vectorized, llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string), _col1 (type: int) + 1 _col0 (type: string), _col1 (type: int) + Statistics: Num rows: 550 Data size: 99000 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 3 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=57) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + Reducer 6 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=57) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str and a.key_int = b.key_int) +PREHOOK: type: QUERY +PREHOOK: Input: default@dsrv_big +PREHOOK: Input: default@dsrv_small +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str and a.key_int = b.key_int) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dsrv_big +POSTHOOK: Input: default@dsrv_small +#### A masked pattern was here #### +84 +PREHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2') +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2') +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Reducer 5 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a + filterExpr: (key_int is not null and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key_int is not null and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key_int (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Execution mode: vectorized, llap + LLAP IO: all inputs + Map 4 + Map Operator Tree: + TableScan + alias: b + filterExpr: ((value) IN ('nonexistent1', 'nonexistent2') and key_int is not null) (type: boolean) + Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ((value) IN ('nonexistent1', 'nonexistent2') and key_int is not null) (type: boolean) + Statistics: Num rows: 29 Data size: 5162 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key_int (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 29 Data size: 5162 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 29 Data size: 5162 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 29 Data size: 5162 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=29) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) + Execution mode: vectorized, llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + Statistics: Num rows: 550 Data size: 99000 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 3 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=29) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2') +PREHOOK: type: QUERY +PREHOOK: Input: default@dsrv_big +PREHOOK: Input: default@dsrv_small +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dsrv_big +POSTHOOK: Input: default@dsrv_small +#### A masked pattern was here #### +0 +PREHOOK: query: drop table dsrv_big +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@dsrv_big +PREHOOK: Output: default@dsrv_big +POSTHOOK: query: drop table dsrv_big +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@dsrv_big +POSTHOOK: Output: default@dsrv_big +PREHOOK: query: drop table dsrv_small +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@dsrv_small +PREHOOK: Output: default@dsrv_small +POSTHOOK: query: drop table dsrv_small +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@dsrv_small +POSTHOOK: Output: default@dsrv_small diff --git a/ql/src/test/results/clientpositive/vector_binary_join_groupby.q.out b/ql/src/test/results/clientpositive/vector_binary_join_groupby.q.out index 850278e..dead5a6 100644 --- a/ql/src/test/results/clientpositive/vector_binary_join_groupby.q.out +++ b/ql/src/test/results/clientpositive/vector_binary_join_groupby.q.out @@ -166,6 +166,7 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: bigint) + Execution mode: vectorized Local Work: Map Reduce Local Work Reduce Operator Tree: diff --git a/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java b/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java index d44bba8..e9f419d 100644 --- a/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java +++ b/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java @@ -292,6 +292,42 @@ public static BloomFilter deserialize(InputStream in) throws IOException { } } + // Given a byte array consisting of a serialized BloomFilter, gives the offset (from 0) + // for the start of the serialized long values that make up the bitset. + // NumHashFunctions (1 byte) + NumBits (4 bytes) + public static final int START_OF_SERIALIZED_LONGS = 5; + + /** + * Merges BloomFilter bf2 into bf1. + * Assumes 2 BloomFilters with the same size/hash functions are serialized to byte arrays + * @param bf1Bytes + * @param bf1Start + * @param bf1Length + * @param bf2Bytes + * @param bf2Start + * @param bf2Length + */ + public static void mergeBloomFilterBytes( + byte[] bf1Bytes, int bf1Start, int bf1Length, + byte[] bf2Bytes, int bf2Start, int bf2Length) { + if (bf1Length != bf2Length) { + throw new IllegalArgumentException("bf1Length " + bf1Length + " does not match bf2Length " + bf2Length); + } + + // Validation on the bitset size/3 hash functions. + for (int idx = 0; idx < START_OF_SERIALIZED_LONGS; ++idx) { + if (bf1Bytes[bf1Start + idx] != bf2Bytes[bf2Start + idx]) { + throw new IllegalArgumentException("bf1 NumHashFunctions/NumBits does not match bf2"); + } + } + + // Just bitwise-OR the bits together - size/# functions should be the same, + // rest of the data is serialized long values for the bitset which are supposed to be bitwise-ORed. + for (int idx = START_OF_SERIALIZED_LONGS; idx < bf1Length; ++idx) { + bf1Bytes[bf1Start + idx] |= bf2Bytes[bf2Start + idx]; + } + } + /** * Bare metal bit set implementation. For performance reasons, this implementation does not check * for index bounds nor expand the bit set size if the specified index is greater than the size. diff --git a/storage-api/src/test/org/apache/hive/common/util/TestBloomFilter.java b/storage-api/src/test/org/apache/hive/common/util/TestBloomFilter.java index 63c7050..e4ee93a 100644 --- a/storage-api/src/test/org/apache/hive/common/util/TestBloomFilter.java +++ b/storage-api/src/test/org/apache/hive/common/util/TestBloomFilter.java @@ -20,8 +20,12 @@ import static org.junit.Assert.assertEquals; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; import java.util.Random; +import org.junit.Assert; import org.junit.Test; /** @@ -461,4 +465,125 @@ public void testMerge() { assertEquals(true, bf.testString(v2)); assertEquals(true, bf.testString(v3)); } + + @Test + public void testSerialize() throws Exception { + BloomFilter bf1 = new BloomFilter(10000); + String[] inputs = { + "bloo", + "bloom fil", + "bloom filter", + "cuckoo filter", + }; + + for (String val : inputs) { + bf1.addString(val); + } + + // Serialize/deserialize + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + BloomFilter.serialize(bytesOut, bf1); + ByteArrayInputStream bytesIn = new ByteArrayInputStream(bytesOut.toByteArray()); + BloomFilter bf2 = BloomFilter.deserialize(bytesIn); + + for (String val : inputs) { + assertEquals("Testing bf1 with " + val, true, bf1.testString(val)); + assertEquals("Testing bf2 with " + val, true, bf2.testString(val)); + } + } + + @Test + public void testMergeBloomFilterBytes() throws Exception { + BloomFilter bf1 = new BloomFilter(10000); + BloomFilter bf2 = new BloomFilter(10000); + + String[] inputs1 = { + "bloo", + "bloom fil", + "bloom filter", + "cuckoo filter", + }; + + String[] inputs2 = { + "2_bloo", + "2_bloom fil", + "2_bloom filter", + "2_cuckoo filter", + }; + + for (String val : inputs1) { + bf1.addString(val); + } + for (String val : inputs2) { + bf2.addString(val); + } + + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + BloomFilter.serialize(bytesOut, bf1); + byte[] bf1Bytes = bytesOut.toByteArray(); + bytesOut.reset(); + BloomFilter.serialize(bytesOut, bf1); + byte[] bf2Bytes = bytesOut.toByteArray(); + + // Merge bytes + BloomFilter.mergeBloomFilterBytes( + bf1Bytes, 0, bf1Bytes.length, + bf2Bytes, 0, bf2Bytes.length); + + // Deserialize and test + ByteArrayInputStream bytesIn = new ByteArrayInputStream(bf1Bytes, 0, bf1Bytes.length); + BloomFilter bfMerged = BloomFilter.deserialize(bytesIn); + // All values should pass test + for (String val : inputs1) { + bfMerged.addString(val); + } + for (String val : inputs2) { + bfMerged.addString(val); + } + } + + @Test + public void testMergeBloomFilterBytesFailureCases() throws Exception { + BloomFilter bf1 = new BloomFilter(1000); + BloomFilter bf2 = new BloomFilter(200); + // Create bloom filter with same number of bits, but different # hash functions + ArrayList bits = new ArrayList(); + for (int idx = 0; idx < bf1.getBitSet().length; ++idx) { + bits.add(0L); + } + BloomFilter bf3 = new BloomFilter(bits, bf1.getBitSize(), bf1.getNumHashFunctions() + 1); + + // Serialize to bytes + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + BloomFilter.serialize(bytesOut, bf1); + byte[] bf1Bytes = bytesOut.toByteArray(); + + bytesOut.reset(); + BloomFilter.serialize(bytesOut, bf2); + byte[] bf2Bytes = bytesOut.toByteArray(); + + bytesOut.reset(); + BloomFilter.serialize(bytesOut, bf3); + byte[] bf3Bytes = bytesOut.toByteArray(); + + try { + // this should fail + BloomFilter.mergeBloomFilterBytes( + bf1Bytes, 0, bf1Bytes.length, + bf2Bytes, 0, bf2Bytes.length); + Assert.fail("Expected exception not encountered"); + } catch (IllegalArgumentException err) { + // expected + } + + try { + // this should fail + BloomFilter.mergeBloomFilterBytes( + bf1Bytes, 0, bf1Bytes.length, + bf3Bytes, 0, bf3Bytes.length); + Assert.fail("Expected exception not encountered"); + } catch (IllegalArgumentException err) { + // expected + } + } }