diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java index 3f538b3..0bc8d84 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java @@ -364,6 +364,8 @@ system.registerGenericUDF("restrict_information_schema", GenericUDFRestrictInformationSchema.class); system.registerGenericUDF("current_authorizer", GenericUDFCurrentAuthorizer.class); + system.registerGenericUDF("surrogate_key", GenericUDFSurrogateKey.class); + system.registerGenericUDF("isnull", GenericUDFOPNull.class); system.registerGenericUDF("isnotnull", GenericUDFOPNotNull.class); system.registerGenericUDF("istrue", GenericUDFOPTrue.class); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index 3309b9b..b655ab1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -94,6 +94,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCurrentTimestamp; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCurrentUser; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFSurrogateKey; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.io.DateWritableV2; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; @@ -832,7 +833,8 @@ else if(defaultValExpr instanceof ExprNodeGenericFuncDesc){ if(defFunc.getGenericUDF() instanceof GenericUDFOPNull || defFunc.getGenericUDF() instanceof GenericUDFCurrentTimestamp || defFunc.getGenericUDF() instanceof GenericUDFCurrentDate - || defFunc.getGenericUDF() instanceof GenericUDFCurrentUser){ + || defFunc.getGenericUDF() instanceof GenericUDFCurrentUser + || defFunc.getGenericUDF() instanceof GenericUDFSurrogateKey){ return true; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 6d7e63e..2539d4b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -238,11 +238,13 @@ import org.apache.hadoop.hive.ql.session.SessionState.ResourceType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFArray; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCardinalityViolation; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMurmurHash; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFSurrogateKey; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTFInline; import org.apache.hadoop.hive.ql.util.ResourceDownloader; @@ -7705,6 +7707,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) fileSinkDesc, fsRS, input), inputRR); handleLineage(ltd, output); + setWriteIdForSurrogateKeys(ltd, input); if (LOG.isDebugEnabled()) { LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: " @@ -7949,6 +7952,22 @@ private void handleLineage(LoadTableDesc ltd, Operator output) } } + private void setWriteIdForSurrogateKeys(LoadTableDesc ltd, Operator input) throws SemanticException { + Map columnExprMap = input.getConf().getColumnExprMap(); + if (ltd == null || columnExprMap == null) { + return; + } + + for (ExprNodeDesc desc : columnExprMap.values()) { + if (desc instanceof ExprNodeGenericFuncDesc) { + GenericUDF genericUDF = ((ExprNodeGenericFuncDesc)desc).getGenericUDF(); + if (genericUDF instanceof GenericUDFSurrogateKey) { + ((GenericUDFSurrogateKey)genericUDF).setWriteId(ltd.getWriteId()); + } + } + } + } + private WriteEntity generateTableWriteEntity(String dest, Table dest_tab, Map partSpec, LoadTableDesc ltd, DynamicPartitionCtx dpCtx, boolean isNonNativeTable) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFSurrogateKey.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFSurrogateKey.java new file mode 100644 index 0000000..1326652 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFSurrogateKey.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.exec.tez.TezContext; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector; +import org.apache.hadoop.io.LongWritable; + +/** + * This function is not a deterministic function, and not a runtime constant. + * The return value is sequence within a query with a unique staring point based on write_id and task_id + */ +@UDFType(deterministic = false) +public class GenericUDFSurrogateKey extends GenericUDF { + private static final int DEFAULT_WRITE_ID_BITS = 24; + private static final int DEFAULT_TASK_ID_BITS = 16; + private static final int DEFAULT_ROW_ID_BITS = 24; + + private int writeIdBits; + private int taskIdBits; + private int rowIdBits; + + private TezContext context; + + private long maxWriteId; + private long maxTaskId; + private long maxRowId; + + private long writeId = -1; + private long lastTaskId = -1; + private long lastRowId = -1; + + @Override + public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { + if (arguments.length == 0) { + writeIdBits = DEFAULT_WRITE_ID_BITS; + taskIdBits = DEFAULT_TASK_ID_BITS; + rowIdBits = DEFAULT_ROW_ID_BITS; + } else if (arguments.length == 2) { + for (int i = 0; i < 2; i++) { + if (arguments[i].getCategory() != Category.PRIMITIVE) { + throw new UDFArgumentTypeException(0, + "SURROGATE_KEY input only takes primitive types, got " + arguments[i].getTypeName()); + } + } + + writeIdBits = ((WritableConstantIntObjectInspector)arguments[0]).getWritableConstantValue().get(); + taskIdBits = ((WritableConstantIntObjectInspector)arguments[1]).getWritableConstantValue().get(); + rowIdBits = 64 - (writeIdBits + taskIdBits); + + if (writeIdBits < 1 || writeIdBits > 62) { + throw new UDFArgumentException("Write ID bits must be between 1 and 62 (value: " + writeIdBits + ")"); + } + if (taskIdBits < 1 || taskIdBits > 62) { + throw new UDFArgumentException("Task ID bits must be between 1 and 62 (value: " + taskIdBits + ")"); + } + if (writeIdBits + taskIdBits > 63) { + throw new UDFArgumentException("Write ID bits + Task ID bits must be less than 63 (value: " + + (writeIdBits + taskIdBits) + ")"); + } + } else { + throw new UDFArgumentLengthException( + "The function SURROGATE_KEY takes 0 or 2 integer arguments (write id bits, taks id bits), but found " + + arguments.length); + } + + maxWriteId = (1 << writeIdBits) - 1; + maxTaskId = (1 << taskIdBits) - 1; + maxRowId = (1 << rowIdBits) - 1; + + return PrimitiveObjectInspectorFactory.writableLongObjectInspector; + } + + @Override + public void configure(MapredContext context) { + if (context instanceof TezContext) { + this.context = (TezContext)context; + } + } + + public void setWriteId(long writeId) { + this.writeId = writeId; + } + + @Override + public Object evaluate(DeferredObject[] arguments) throws HiveException { + if (writeId == -1) { + throw new HiveException("Could not obtain Write ID for the surrogate_key function"); + } + + long taskId = context.getTezProcessorContext().getTaskIndex(); + long rowId = (lastTaskId != -1 && taskId == lastTaskId) ? lastRowId + 1 : 0; + + if (writeId > maxWriteId) { + throw new HiveException(String.format("Write ID is out of range (%d bits) in surrogate_key", writeIdBits)); + } + if (taskId > maxTaskId) { + throw new HiveException(String.format("Task ID is out of range (%d bits) in surrogate_key", taskIdBits)); + } + if (rowId > maxRowId) { + throw new HiveException(String.format("Row ID is out of range (%d bits) in surrogate_key", rowIdBits)); + } + + long uniqueId = (writeId << (taskIdBits + rowIdBits)) + (taskId << rowIdBits) + rowId; + + lastTaskId = taskId; + lastRowId = rowId; + + return new LongWritable(uniqueId); + } + + @Override + public String getDisplayString(String[] children) { + return "SURROGATE_KEY()"; + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFSurrogateKey.java b/ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFSurrogateKey.java new file mode 100644 index 0000000..f21f92f --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFSurrogateKey.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.udf.generic; + +import junit.framework.TestCase; + +import org.apache.hadoop.hive.ql.exec.tez.TezContext; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject; +import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.tez.runtime.api.ProcessorContext; +import org.mockito.Mockito; +import static org.mockito.Mockito.when; + +public class TestGenericUDFSurrogateKey extends TestCase { + + public void testSurrogateKeyDefault() throws HiveException { + GenericUDFSurrogateKey udf = new GenericUDFSurrogateKey(); + ObjectInspector[] arguments = {}; + TezContext mockContext = Mockito.mock(TezContext.class); + ProcessorContext mockProcessorContest = Mockito.mock(ProcessorContext.class); + when(mockContext.getTezProcessorContext()).thenReturn(mockProcessorContest); + when(mockProcessorContest.getTaskIndex()) + .thenReturn(0) + .thenReturn(0) + .thenReturn(1); + + udf.initialize(arguments); + udf.configure(mockContext); + udf.setWriteId(1); + + DeferredObject[] args = {}; + runAndVerifyConst(args, (1L << 40), udf); + runAndVerifyConst(args, (1L << 40) + 1, udf); + runAndVerifyConst(args, (1L << 40) + (1L << 24), udf); + } + + public void testSurrogateKeyBitsSet() throws HiveException { + GenericUDFSurrogateKey udf = new GenericUDFSurrogateKey(); + ConstantObjectInspector argument0 = PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.intTypeInfo, new IntWritable(10)); + ConstantObjectInspector argument1 = PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( + TypeInfoFactory.intTypeInfo, new IntWritable(10)); + ObjectInspector[] arguments = {argument0, argument1}; + TezContext mockContext = Mockito.mock(TezContext.class); + ProcessorContext mockProcessorContest = Mockito.mock(ProcessorContext.class); + when(mockContext.getTezProcessorContext()).thenReturn(mockProcessorContest); + when(mockProcessorContest.getTaskIndex()) + .thenReturn(0) + .thenReturn(0) + .thenReturn(1); + + udf.initialize(arguments); + udf.configure(mockContext); + udf.setWriteId(1); + + DeferredObject[] args = {}; + runAndVerifyConst(args, (1L << 54), udf); + runAndVerifyConst(args, (1L << 54) + 1, udf); + runAndVerifyConst(args, (1L << 54) + (1L << 44), udf); + } + + private void runAndVerifyConst(DeferredObject[] args, long expResult, GenericUDFSurrogateKey udf) + throws HiveException { + LongWritable output = (LongWritable)udf.evaluate(args); + assertEquals("surrogate_key() test ", expResult, output.get()); + } +} diff --git a/ql/src/test/results/clientpositive/show_functions.q.out b/ql/src/test/results/clientpositive/show_functions.q.out index 8d41e78..b4ba322 100644 --- a/ql/src/test/results/clientpositive/show_functions.q.out +++ b/ql/src/test/results/clientpositive/show_functions.q.out @@ -254,6 +254,7 @@ substr substring substring_index sum +surrogate_key tan to_date to_epoch_milli