diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java index 3f538b3..0bc8d84 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java @@ -364,6 +364,8 @@ system.registerGenericUDF("restrict_information_schema", GenericUDFRestrictInformationSchema.class); system.registerGenericUDF("current_authorizer", GenericUDFCurrentAuthorizer.class); + system.registerGenericUDF("surrogate_key", GenericUDFSurrogateKey.class); + system.registerGenericUDF("isnull", GenericUDFOPNull.class); system.registerGenericUDF("isnotnull", GenericUDFOPNotNull.class); system.registerGenericUDF("istrue", GenericUDFOPTrue.class); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index f24add3..20eed6e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -94,6 +94,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCurrentTimestamp; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCurrentUser; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFSurrogateKey; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.io.DateWritableV2; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; @@ -832,7 +833,8 @@ else if(defaultValExpr instanceof ExprNodeGenericFuncDesc){ if(defFunc.getGenericUDF() instanceof GenericUDFOPNull || defFunc.getGenericUDF() instanceof GenericUDFCurrentTimestamp || defFunc.getGenericUDF() instanceof GenericUDFCurrentDate - || defFunc.getGenericUDF() instanceof GenericUDFCurrentUser){ + || defFunc.getGenericUDF() instanceof GenericUDFCurrentUser + || defFunc.getGenericUDF() instanceof GenericUDFSurrogateKey){ return true; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 6d7e63e..cb8924e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7705,6 +7705,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) fileSinkDesc, fsRS, input), inputRR); handleLineage(ltd, output); + setLtdToFunctions(ltd, input); if (LOG.isDebugEnabled()) { LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: " @@ -7949,6 +7950,20 @@ private void handleLineage(LoadTableDesc ltd, Operator output) } } + private void setLtdToFunctions(LoadTableDesc ltd, Operator input) throws SemanticException { + Map columnExprMap = input.getConf().getColumnExprMap(); + if (ltd == null || columnExprMap == null) { + return; + } + + for (ExprNodeDesc desc : columnExprMap.values()) { + if (desc instanceof ExprNodeGenericFuncDesc) { + ExprNodeGenericFuncDesc engfDesc = (ExprNodeGenericFuncDesc)desc; + engfDesc.getGenericUDF().setLoadTableDesc(ltd); + } + } + } + private WriteEntity generateTableWriteEntity(String dest, Table dest_tab, Map partSpec, LoadTableDesc ltd, DynamicPartitionCtx dpCtx, boolean isNonNativeTable) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDF.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDF.java index 0d8d659..d8d8409 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDF.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDF.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DateWritableV2; @@ -136,6 +137,14 @@ public void configure(MapredContext context) { } /** + * Additional setup with the description of the table loaded by this function. + * + * @param loadTableDesc the description of the table loaded by this function. + */ + public void setLoadTableDesc(LoadTableDesc loadTableDesc) { + } + + /** * Initialize this GenericUDF. Additionally, if the arguments are constant * and the function is eligible to be folded, then the constant value * returned by this UDF will be computed and stored in the diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFSurrogateKey.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFSurrogateKey.java new file mode 100644 index 0000000..c999132 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFSurrogateKey.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.exec.tez.TezContext; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc; +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector; +import org.apache.hadoop.io.LongWritable; + +// This function is not a deterministic function, and not a runtime constant. +// The return value is sequence within a query with a unique staring point based on write_id and task_id +@UDFType(deterministic = false) +public class GenericUDFSurrogateKey extends GenericUDF { + private static final int DEFAULT_WRITE_ID_BITS = 24; + private static final int DEFAULT_TASK_ID_BITS = 16; + private static final int DEFAULT_ROW_ID_BITS = 24; + + private int writeIdBits; + private int taskIdBits; + private int rowIdBits; + + private long maxWriteId; + private long maxTaskId; + private long maxRowId; + + private long writeId = -1; + private long lastTaskId = -1; + private long lastRowId = -1; + + @Override + public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { + if (arguments.length == 0) { + writeIdBits = DEFAULT_WRITE_ID_BITS; + taskIdBits = DEFAULT_TASK_ID_BITS; + rowIdBits = DEFAULT_ROW_ID_BITS; + } else if (arguments.length == 2) { + for (int i = 0; i < 2; i++) + if (arguments[i].getCategory() != Category.PRIMITIVE) { + throw new UDFArgumentTypeException(0, + "SURROGATE_KEY input only takes primitive types, got " + arguments[i].getTypeName()); + } + + writeIdBits = ((WritableConstantIntObjectInspector)arguments[0]).getWritableConstantValue().get(); + taskIdBits = ((WritableConstantIntObjectInspector)arguments[1]).getWritableConstantValue().get(); + rowIdBits = 64 - (writeIdBits + taskIdBits); + + if (writeIdBits < 1 || writeIdBits > 62) { + throw new UDFArgumentException("Write ID bits must be between 1 and 62 (value: " + writeIdBits + ")"); + } + if (taskIdBits < 1 || taskIdBits > 62) { + throw new UDFArgumentException("Task ID bits must be between 1 and 62 (value: " + taskIdBits + ")"); + } + if (writeIdBits + taskIdBits > 63) { + throw new UDFArgumentException("Write ID bits + Task ID bits must be less than 63 (value: " + + (writeIdBits + taskIdBits) + ")"); + } + } else { + throw new UDFArgumentLengthException( + "The function SURROGATE_KEY takes 0 or 2 integer arguments (write id bits, taks id bits), but found " + arguments.length); + } + + maxWriteId = (1 << writeIdBits) - 1; + maxTaskId = (1 << taskIdBits) - 1; + maxRowId = (1 << rowIdBits) - 1; + + return PrimitiveObjectInspectorFactory.writableLongObjectInspector; + } + + @Override + public void setLoadTableDesc(LoadTableDesc loadTableDesc) { + if (loadTableDesc != null) { + writeId = loadTableDesc.getWriteId(); + } + } + + @Override + public Object evaluate(DeferredObject[] arguments) throws HiveException { + if (writeId == -1) { + throw new HiveException("Could not obtain Write ID for the surrogate_key function"); + } + + long taskId = ((TezContext)MapredContext.get()).getTezProcessorContext().getTaskIndex(); + long rowId = (lastTaskId != -1 && taskId == lastTaskId) ? lastRowId + 1 : 0; + + if (writeId > maxWriteId) { + throw new HiveException(String.format("Write ID is out of range (%d bits) in surrogate_key", writeIdBits)); + } + if (taskId > maxTaskId) { + throw new HiveException(String.format("Task ID is out of range (%d bits) in surrogate_key", taskIdBits)); + } + if (rowId > maxRowId) { + throw new HiveException(String.format("Row ID is out of range (%d bits) in surrogate_key", rowIdBits)); + } + + long uniqueId = (writeId << (taskIdBits + rowIdBits)) + (taskId << rowIdBits) + rowId; + + lastTaskId = taskId; + lastRowId = rowId; + + return new LongWritable(uniqueId); + } + + @Override + public String getDisplayString(String[] children) { + return "SURROGATE_KEY()"; + } +}