diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java index 9c90230..de4965f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java @@ -31,6 +31,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; +import org.apache.hadoop.hive.ql.exec.FunctionInfo; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.hive.ql.exec.vector.expressions.ConstantVectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.FilterExprAndExpr; @@ -41,6 +43,8 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.SelectColumnIsNull; import org.apache.hadoop.hive.ql.exec.vector.expressions.SelectColumnIsTrue; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorUDF; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorUDFArgDesc; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorUDFUnixTimeStampLong; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCount; @@ -139,6 +143,15 @@ private int getInputColumnIndex(String name) { return columnMap.get(name); } } + + /* Return true if we are running in the planner, and false if we + * are running in a task. + */ + private boolean isPlanner() { + + // This relies on the behavior that columnMap is null in the planner. + return columnMap == null; + } private class OutputColumnManager { private final int initialOutputCol; @@ -249,8 +262,17 @@ public VectorExpression getVectorExpression(ExprNodeDesc exprDesc) throws HiveEx ve = getVectorExpression((ExprNodeColumnDesc) exprDesc); } else if (exprDesc instanceof ExprNodeGenericFuncDesc) { ExprNodeGenericFuncDesc expr = (ExprNodeGenericFuncDesc) exprDesc; - ve = getVectorExpression(expr.getGenericUDF(), - expr.getChildExprs()); + if (isCustomUDF(expr)) { + if (isPlanner()) { + + // record that this is a custom UDF, for later reference in map-reduce tasks + expr.setIsCustomUDF(true); + } + ve = getCustomUDFExpression(expr); + } else { + ve = getVectorExpression(expr.getGenericUDF(), + expr.getChildExprs()); + } } else if (exprDesc instanceof ExprNodeConstantDesc) { ve = getConstantVectorExpression((ExprNodeConstantDesc) exprDesc); } @@ -260,6 +282,35 @@ public VectorExpression getVectorExpression(ExprNodeDesc exprDesc) throws HiveEx return ve; } + // return true if the expr is a custom UDF (not built in) + private boolean isCustomUDF(ExprNodeGenericFuncDesc expr) { + if (isPlanner()) { + GenericUDF udf = expr.getGenericUDF(); + GenericUDFBridge udfBridge; + if (udf instanceof GenericUDFBridge) { + udfBridge = (GenericUDFBridge) udf; + } else { + return false; + } + String udfName = udfBridge.getUdfName(); + + // kluge for now to enable more debugging progress + // TODO fix this + /* + if (udfName.equalsIgnoreCase("initcaps")) { + return true; + } + */ + FunctionInfo funcInfo = FunctionRegistry.getFunctionInfo(udfName); + boolean isNativeFunc = funcInfo.isNative(); + return !isNativeFunc; + } else { + + // we're running in a task, not the planner, so the information was recorded in the expr + return expr.getIsCustomUDF(); + } + } + /** * Handles only the special case of unary operators on a constant. * @param exprDesc @@ -466,6 +517,112 @@ private VectorExpression getVectorExpression(GenericUDFBridge udf, throw new HiveException("Udf: "+udf.getClass().getSimpleName()+", is not supported"); } + + /* + * Return vector expression for a custom (i.e. not built-in) UDF. + */ + private VectorExpression getCustomUDFExpression(ExprNodeGenericFuncDesc expr) + throws HiveException { + + GenericUDFBridge udfBridge = (GenericUDFBridge) expr.getGenericUDF(); + List childExprList = expr.getChildExprs(); + + // argument descriptors + VectorUDFArgDesc[] argDescs = new VectorUDFArgDesc[expr.getChildExprs().size()]; + for (int i = 0; i < argDescs.length; i++) { + argDescs[i] = new VectorUDFArgDesc(); + } + + // positions of constant arguments (0 based) + ArrayList constantArgPositions = new ArrayList(); + + // positions of variable arguments (columns or non-constant expressions) + ArrayList variableArgPositions = new ArrayList(); + + // column numbers of batch corresponding to variable arguments + ArrayList variableArgColumnNums = new ArrayList(); + + // Column numbers of batch corresponding to expression result arguments + ArrayList exprResultColumnNums = new ArrayList(); + + // Prepare children + ArrayList vectorExprs = new ArrayList(); + + for (int i = 0; i < childExprList.size(); i++) { + ExprNodeDesc child = childExprList.get(i); + if (child instanceof ExprNodeGenericFuncDesc) { + VectorExpression e = getVectorExpression(child); + vectorExprs.add(e); + variableArgPositions.add(i); + variableArgColumnNums.add(e.getOutputColumn()); + exprResultColumnNums.add(e.getOutputColumn()); + argDescs[i].setVariable(e.getOutputColumn()); + } else if (child instanceof ExprNodeColumnDesc) { + variableArgPositions.add(i); + variableArgColumnNums.add( + getInputColumnIndex(((ExprNodeColumnDesc) child).getColumn())); + argDescs[i].setVariable(getInputColumnIndex(((ExprNodeColumnDesc) child).getColumn())); + } else if (child instanceof ExprNodeConstantDesc) { + // this is a constant + constantArgPositions.add(i); + argDescs[i].setConstant((ExprNodeConstantDesc) child); + } else { + throw new HiveException("Unable to vectorize Custom UDF"); + } + } + + // Allocate output column and get column number; + int outputCol = -1; + String resultColVectorType; + String resultType = expr.getTypeInfo().getTypeName(); + if (resultType.equalsIgnoreCase("string")) { + resultColVectorType = "String"; + } else if (isIntFamily(resultType)) { + resultColVectorType = "Long"; + } else if (isFloatFamily(resultType)) { + resultColVectorType = "Double"; + } else { + throw new HiveException("Unable to vectorize due to unsupported custom UDF return type " + + resultType); + } + outputCol = ocm.allocateOutputColumn(resultColVectorType); + + // Make vectorized operator + VectorExpression ve; + ve = new VectorUDF(expr, variableArgColumnNums, outputCol, resultColVectorType, + constantArgPositions, variableArgPositions, argDescs); + + // Set child expressions + VectorExpression[] childVEs = null; + if (exprResultColumnNums.size() != 0) { + childVEs = new VectorExpression[exprResultColumnNums.size()]; + for (int i = 0; i < childVEs.length; i++) { + childVEs[i] = vectorExprs.get(i); + } + } + ve.setChildExpressions(childVEs); + + // Free output columns if inputs have non-leaf expression trees. + for (Integer i : exprResultColumnNums) { + ocm.freeOutputColumn(i); + } + return ve; + } + + // return true if this is any kind of float + public static boolean isFloatFamily(String resultType) { + return resultType.equalsIgnoreCase("double") + || resultType.equalsIgnoreCase("float"); + } + + // Return true if this data type is handled in the output vector as an integer. + public static boolean isIntFamily(String resultType) { + return resultType.equalsIgnoreCase("tinyint") + || resultType.equalsIgnoreCase("smallint") + || resultType.equalsIgnoreCase("int") + || resultType.equalsIgnoreCase("bigint") + || resultType.equalsIgnoreCase("boolean"); + } /* Return a unary string vector expression. This is used for functions like * UPPER() and LOWER(). diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDF.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDF.java new file mode 100644 index 0000000..2404007 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDF.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.vector.expressions; + +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.ExprNodeGenericFuncEvaluator; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject; +import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableBooleanObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableByteObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableDoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableFloatObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableShortObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableTimestampObjectInspector; +import org.apache.hadoop.io.Text; + +import java.sql.Timestamp; +import java.util.ArrayList; + +/** + * A VectorUDF is a vectorized expression for invoking a custom + * UDF on zero or more input vectors that serve as the function arguments. + */ +public class VectorUDF extends VectorExpression { + + ExprNodeGenericFuncEvaluator genFuncEvaluator; + + // from ExprNodeGenericFuncEvaluator + GenericUDF genericUDF; + GenericUDF.DeferredObject[] deferredChildren; + + ArrayList inputColumns; + int outputColumn; + ArrayList constantArgNumbers; // 0-based numbers of constant function arguments + ArrayList constantArgs; // actual values of constant arguments + ArrayList constantArgTypes; + ArrayList variableArgNumbers; // 0-based numbers of variable arguments + ObjectInspector outputOI; + ObjectInspector[] childrenOIs; + String resultType; + VectorExpressionWriter[] writers; + VectorUDFArgDesc[] argDescs; + + public VectorUDF ( + ExprNodeGenericFuncDesc expr, + ArrayList inputColumns, + int outputColumn, + String resultType, + ArrayList constantArgNumbers, + ArrayList variableArgNumbers, + VectorUDFArgDesc[] argDescs) throws HiveException { + + // Consider using the ExprNodeGenericFuncEvaluator to get + // the output object inspector. + this.genFuncEvaluator = new ExprNodeGenericFuncEvaluator(expr); + + this.genericUDF = expr.getGenericUDF(); + this.inputColumns = inputColumns; + this.outputColumn = outputColumn; + this.constantArgNumbers = constantArgNumbers; + this.resultType = resultType; + this.argDescs = argDescs; + deferredChildren = new GenericUDF.DeferredObject[expr.getChildExprs().size()]; + childrenOIs = new ObjectInspector[expr.getChildExprs().size()]; + writers = VectorExpressionWriterFactory.getExpressionWriters(expr.getChildExprs()); + for (int i = 0; i < childrenOIs.length; i++) { + childrenOIs[i] = writers[i].getObjectInspector(); + } + // outputOI = genericUDF.initializeAndFoldConstants(childrenOIs); + outputOI = VectorExpressionWriterFactory.genVectorExpressionWritable(expr) + .getObjectInspector(); + + this.variableArgNumbers = variableArgNumbers; + + // TODO -- see if this works + this.genericUDF.initialize(childrenOIs); + } + + @Override + public void evaluate(VectorizedRowBatch batch) { + + if (childExpressions != null) { + super.evaluateChildren(batch); + } + + int[] sel = batch.selected; + int n = batch.size; + ColumnVector outV = batch.cols[outputColumn]; + + // If the output column is of type string, initialize the buffer to receive data. + if (outV instanceof BytesColumnVector) { + ((BytesColumnVector) outV).initBuffer(); + } + + if (n == 0) { + //Nothing to do + return; + } + + batch.cols[outputColumn].noNulls = true; + + /* isRepeating is not optimized in the current implementation. The function is + * evaluated for every row even if isRepeating is true for a function input + * column for a batch. isRepeating could be optimized + * in the future, particularly for single-argument functions. Optimizing isRepeating + * logic for this general-purpose multi-argument function implementation was + * thought to be too complex to attempt. + */ + batch.cols[outputColumn].isRepeating = false; + + if (batch.selectedInUse) { + for(int j = 0; j != n; j++) { + int i = sel[j]; + setResult(i, batch); + } + } else { + for (int i = 0; i != n; i++) { + setResult(i, batch); + } + } + } + + /* Calculate the function result for row i of the batch and + * set the output column vector entry i to the result. + */ + private void setResult(int i, VectorizedRowBatch b) { + + // get arguments + for (int j = 0; j < argDescs.length; j++) { + deferredChildren[j] = argDescs[j].getDeferredJavaObject(i, b, j, writers); + } + + // call function + Object result; + try { + result = genericUDF.evaluate(deferredChildren); + } catch (HiveException e) { + result = null; + } + + // set output column vector entry + if (result == null) { + b.cols[outputColumn].noNulls = false; + b.cols[outputColumn].isNull[i] = true; + } else { + b.cols[outputColumn].isNull[i] = false; + setOutputCol(b.cols[outputColumn], i, result); + } + } + + private void setOutputCol(ColumnVector colVec, int i, Object value) { + + // Depending on the output type, get the value, cast the result to the + // correct type if needed, and assign the result into the output vector. + if (outputOI instanceof WritableStringObjectInspector) { + BytesColumnVector bv = (BytesColumnVector) colVec; + Text t = ((WritableStringObjectInspector) outputOI).getPrimitiveWritableObject(value); + bv.setVal(i, t.getBytes(), 0, t.getBytes().length); + } else if (outputOI instanceof WritableIntObjectInspector) { + LongColumnVector lv = (LongColumnVector) colVec; + long l = (long) ((WritableIntObjectInspector) outputOI).get(value); + lv.vector[i] = l; + } else if (outputOI instanceof WritableLongObjectInspector) { + LongColumnVector lv = (LongColumnVector) colVec; + long l = ((WritableLongObjectInspector) outputOI).get(value); + lv.vector[i] = l; + } else if (outputOI instanceof WritableDoubleObjectInspector) { + DoubleColumnVector dv = (DoubleColumnVector) colVec; + double d = ((WritableDoubleObjectInspector) outputOI).get(value); + dv.vector[i] = d; + } else if (outputOI instanceof WritableFloatObjectInspector) { + DoubleColumnVector dv = (DoubleColumnVector) colVec; + double d = ((WritableFloatObjectInspector) outputOI).get(value); + dv.vector[i] = d; + } else if (outputOI instanceof WritableShortObjectInspector) { + LongColumnVector lv = (LongColumnVector) colVec; + long l = ((WritableShortObjectInspector) outputOI).get(value); + lv.vector[i] = l; + } else if (outputOI instanceof WritableByteObjectInspector) { + LongColumnVector lv = (LongColumnVector) colVec; + long l = ((WritableByteObjectInspector) outputOI).get(value); + lv.vector[i] = l; + } else if (outputOI instanceof WritableTimestampObjectInspector) { + LongColumnVector lv = (LongColumnVector) colVec; + Timestamp ts = ((WritableTimestampObjectInspector) outputOI).getPrimitiveJavaObject(value); + + /* Calculate the number of nanoseconds since the epoch as a long integer. By convention + * that is how Timestamp values are operated on in a vector. + */ + long l = ts.getTime() * 1000000 // Shift the milliseconds value over by 6 digits + // to scale for nanosecond precision. + // The milliseconds digits will by convention be all 0s. + + ts.getNanos(); // Add on the nanos. + lv.vector[i] = l; + } else if (outputOI instanceof WritableBooleanObjectInspector) { + LongColumnVector lv = (LongColumnVector) colVec; + long l = ((WritableBooleanObjectInspector) outputOI).get(value) ? 1 : 0; + lv.vector[i] = l; + } else { + throw new RuntimeException("Unhandled object type " + outputOI.getTypeName()); + } + } + + @Override + public int getOutputColumn() { + return outputColumn; + } + + @Override + public String getOutputType() { + return resultType; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFArgDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFArgDesc.java new file mode 100644 index 0000000..3221e83 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFArgDesc.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.expressions; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.io.Text; + +/** + * Descriptor for function argument. + */ +public class VectorUDFArgDesc { + boolean isConstant; + int columnNum; + ExprNodeConstantDesc constExpr; + GenericUDF.DeferredJavaObject constObjVal; + + public VectorUDFArgDesc(){} + + /** + * Set this argument to a constant value extracted from the + * expression tree. Prepare the constant for use when the + * function is called. + */ + public void setConstant(ExprNodeConstantDesc expr) { + isConstant = true; + constExpr = expr; + + PrimitiveCategory pc = ((PrimitiveTypeInfo) constExpr.getTypeInfo()) + .getPrimitiveCategory(); + + // Convert from Java to Writable + Object writableValue = PrimitiveObjectInspectorFactory + .getPrimitiveJavaObjectInspector(pc).getPrimitiveWritableObject( + constExpr.getValue()); + + constObjVal = new GenericUDF.DeferredJavaObject(writableValue); + } + + public void setVariable(int i) { + columnNum = i; + } + + public boolean isConstant() { + return isConstant; + } + + public boolean isVariable() { + return !isConstant; + } + + public int getColumn() { + return columnNum; + } + + public DeferredObject getDeferredJavaObject(int row, VectorizedRowBatch b, int argPosition, + VectorExpressionWriter[] writers) { + + if (isConstant()) { + return this.constObjVal; + } else { + + // get column + ColumnVector cv = b.cols[columnNum]; + + // write value to object that can be inspected + Object o; + try { + o = writers[argPosition].writeValue(cv, row); + return new GenericUDF.DeferredJavaObject(o); + } catch (HiveException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return null; + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java index eefc726..2c6c220 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java @@ -70,6 +70,13 @@ //Is this an expression that should perform a comparison for sorted searches private boolean isSortedExpr; + /* After the planner runs and invokes VectorizationContext logic, + * this will be true if this node is for a custom (non-built-in) UDF, + * and false otherwise. Hence, map-reduce tasks can read this value + * to determine if this is a custom UDF. + */ + private boolean isCustomUDF; + public ExprNodeGenericFuncDesc() { } @@ -285,4 +292,12 @@ public boolean isSortedExpr() { public void setSortedExpr(boolean isSortedExpr) { this.isSortedExpr = isSortedExpr; } + + public void setIsCustomUDF(boolean newValue) { + this.isCustomUDF = newValue; + } + + public boolean getIsCustomUDF() { + return this.isCustomUDF; + } }