diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 9c721ed..06680c1 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1791,8 +1791,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager", "Set to org.apache.hadoop.hive.ql.lockmgr.DbTxnManager as part of turning on Hive\n" + "transactions, which also requires appropriate settings for hive.compactor.initiator.on,\n" + - "hive.compactor.worker.threads, hive.support.concurrency (true), hive.enforce.bucketing\n" + - "(true), and hive.exec.dynamic.partition.mode (nonstrict).\n" + + "hive.compactor.worker.threads, hive.support.concurrency (true),\n" + + "and hive.exec.dynamic.partition.mode (nonstrict).\n" + "The default DummyTxnManager replicates pre-Hive-0.13 behavior and provides\n" + "no transactions."), HIVE_TXN_STRICT_LOCKING_MODE("hive.txn.strict.locking.mode", true, "In strict mode non-ACID\n" + @@ -1920,6 +1920,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal MERGE_CARDINALITY_VIOLATION_CHECK("hive.merge.cardinality.check", true, "Set to true to ensure that each SQL Merge statement ensures that for each row in the target\n" + "table there is at most 1 matching row in the source table per SQL Specification."), + EK_PROP("fix.dedup", true, ""), // For Druid storage handler HIVE_DRUID_INDEXING_GRANULARITY("hive.druid.indexer.segments.granularity", "DAY", diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 789d2a3..3cbc4e3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -18,16 +18,15 @@ package org.apache.hadoop.hive.ql.exec; +import static org.apache.hadoop.hive.ql.optimizer.SortedDynPartitionOptimizer.BUCKET_NUMBER_COL_NAME; import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.Random; -import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -35,7 +34,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; @@ -44,15 +42,12 @@ import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.Serializer; -import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.io.BinaryComparable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; @@ -84,7 +79,6 @@ private boolean firstRow; private transient int tag; private boolean skipTag = false; - private transient InspectableObject tempInspectableObject = new InspectableObject(); private transient int[] valueIndex; // index for value(+ from keys, - from values) protected transient OutputCollector out; @@ -143,12 +137,6 @@ // TODO: we only ever use one row of these at a time. Why do we need to cache multiple? protected transient Object[][] cachedKeys; - private StructField recIdField; // field to look for record identifier in - private StructField bucketField; // field to look for bucket in record identifier - private StructObjectInspector acidRowInspector; // row inspector used by acid options - private StructObjectInspector recIdInspector; // OI for the record identifier - private IntObjectInspector bucketInspector; // OI for the bucket field in the record id - protected transient long numRows = 0; protected transient long cntr = 1; protected transient long logEveryNRows = 0; @@ -186,7 +174,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { keyEval = new ExprNodeEvaluator[keys.size()]; int i = 0; for (ExprNodeDesc e : keys) { - if (e instanceof ExprNodeConstantDesc && ("_bucket_number").equals(((ExprNodeConstantDesc)e).getValue())) { + if (e instanceof ExprNodeConstantDesc && (BUCKET_NUMBER_COL_NAME).equals(((ExprNodeConstantDesc)e).getValue())) { buckColIdxInKeyForAcid = i; } keyEval[i++] = ExprNodeEvaluatorFactory.get(e); @@ -319,20 +307,6 @@ public void process(Object row, int tag) throws HiveException { // TODO: this is fishy - we init object inspectors based on first tag. We // should either init for each tag, or if rowInspector doesn't really // matter, then we can create this in ctor and get rid of firstRow. - if (conf.getWriteType() == AcidUtils.Operation.UPDATE || - conf.getWriteType() == AcidUtils.Operation.DELETE) { - assert rowInspector instanceof StructObjectInspector : - "Expected rowInspector to be instance of StructObjectInspector but it is a " + - rowInspector.getClass().getName(); - acidRowInspector = (StructObjectInspector)rowInspector; - // The record identifier is always in the first column - recIdField = acidRowInspector.getAllStructFieldRefs().get(0); - recIdInspector = (StructObjectInspector)recIdField.getFieldObjectInspector(); - // The bucket field is in the second position - bucketField = recIdInspector.getAllStructFieldRefs().get(1); - bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector(); - } - if (isLogInfoEnabled) { LOG.info("keys are " + conf.getOutputKeyColumnNames() + " num distributions: " + conf.getNumDistributionKeys()); @@ -360,14 +334,10 @@ public void process(Object row, int tag) throws HiveException { if (bucketEval != null) { bucketNumber = computeBucketNumber(row, conf.getNumBuckets()); cachedKeys[0][buckColIdxInKey] = new Text(String.valueOf(bucketNumber)); - } else if (conf.getWriteType() == AcidUtils.Operation.UPDATE || - conf.getWriteType() == AcidUtils.Operation.DELETE) { - // In the non-partitioned case we still want to compute the bucket number for updates and - // deletes. - bucketNumber = computeBucketNumber(row, conf.getNumBuckets()); - if (buckColIdxInKeyForAcid != -1) { - cachedKeys[0][buckColIdxInKeyForAcid] = new Text(String.valueOf(bucketNumber)); - } + } + if (buckColIdxInKeyForAcid != -1) { + //todo: why is this called forAcid? SPDO seems to create this for any bucketed table + cachedKeys[0][buckColIdxInKeyForAcid] = new Text(String.valueOf(bucketNumber)); } HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null); @@ -427,24 +397,11 @@ public void process(Object row, int tag) throws HiveException { } private int computeBucketNumber(Object row, int numBuckets) throws HiveException { - if (conf.getWriteType() == AcidUtils.Operation.UPDATE || - conf.getWriteType() == AcidUtils.Operation.DELETE) { - // We don't need to evaluate the hash code. Instead read the bucket number directly from - // the row. I don't need to evaluate any expressions as I know I am reading the ROW__ID - // column directly. - Object recIdValue = acidRowInspector.getStructFieldData(row, recIdField); - int buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField)); - if (isLogTraceEnabled) { - LOG.trace("Acid choosing bucket number " + buckNum); - } - return buckNum; - } else { - Object[] bucketFieldValues = new Object[bucketEval.length]; - for (int i = 0; i < bucketEval.length; i++) { - bucketFieldValues[i] = bucketEval[i].evaluate(row); - } - return ObjectInspectorUtils.getBucketNumber(bucketFieldValues, bucketObjectInspectors, numBuckets); + Object[] bucketFieldValues = new Object[bucketEval.length]; + for (int i = 0; i < bucketEval.length; i++) { + bucketFieldValues[i] = bucketEval[i].evaluate(row); } + return ObjectInspectorUtils.getBucketNumber(bucketFieldValues, bucketObjectInspectors, numBuckets); } private void populateCachedDistributionKeys(Object row, int index) throws HiveException { @@ -477,23 +434,35 @@ protected final int computeMurmurHash(HiveKey firstKey) { return hash.hash(firstKey.getBytes(), firstKey.getDistKeyLength(), 0); } + private boolean isUpdateDelete() { + return getConf().getWriteType() == AcidUtils.Operation.UPDATE || + getConf().getWriteType() == AcidUtils.Operation.DELETE; + } + /** + * ToDo: AssertThis + * For Acid Update/Delete case, we expect a single partitionEval of the form + * UDFToInteger(ROW__ID) and buckNum == -1 so that the result of this method + * is to return the bucketId extracted from ROW__ID. + */ private int computeHashCode(Object row, int buckNum) throws HiveException { + if(buckNum > 0 && isUpdateDelete()) { + //is this the right place to assert this? + StringBuilder sb = new StringBuilder(); + for(int i = 0; i < partitionEval.length; i++) { + sb.append("partitionEval[").append(i).append("]=").append(partitionEval[i]); + } + throw new IllegalStateException("buckNum=" + buckNum + " for " + getConf().getWriteType() + " " + sb.toString()); + } // Evaluate the HashCode int keyHashCode = 0; if (partitionEval.length == 0) { - // If no partition cols and not doing an update or delete, just distribute the data uniformly + // If no partition cols, just distribute the data uniformly // to provide better load balance. If the requirement is to have a single reducer, we should // set the number of reducers to 1. Use a constant seed to make the code deterministic. - // For acid operations make sure to send all records with the same key to the same - // FileSinkOperator, as the RecordUpdater interface can't manage multiple writers for a file. - if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) { - if (random == null) { - random = new Random(12345); - } - keyHashCode = random.nextInt(); - } else { - keyHashCode = 1; + if (random == null) { + random = new Random(12345); } + keyHashCode = random.nextInt(); } else { Object[] bucketFieldValues = new Object[partitionEval.length]; for(int i = 0; i < partitionEval.length; i++) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java index 926386b..2779f0c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java @@ -81,7 +81,7 @@ */ public class SortedDynPartitionOptimizer extends Transform { - private static final String BUCKET_NUMBER_COL_NAME = "_bucket_number"; + public static final String BUCKET_NUMBER_COL_NAME = "_bucket_number"; @Override public ParseContext transform(ParseContext pCtx) throws SemanticException { @@ -194,6 +194,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, sortPositions = Arrays.asList(0); sortOrder = Arrays.asList(1); // 1 means asc, could really use enum here in the thrift if bucketColumns = new ArrayList<>(); // Bucketing column is already present in ROW__ID, which is specially handled in ReduceSink + //ROW__ID is always the 1st column of Insert representing Update/Delete and we wrap it in UDFToInteger somewhere which extracts bucketId from it + bucketColumns.add(new ExprNodeColumnDesc(fsParent.getSchema().getSignature().get(0))); } else { if (!destTable.getSortCols().isEmpty()) { // Sort columns specified by table @@ -247,7 +249,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } } RowSchema selRS = new RowSchema(fsParent.getSchema()); - if (!bucketColumns.isEmpty() || fsOp.getConf().getWriteType() == Operation.DELETE || fsOp.getConf().getWriteType() == Operation.UPDATE) { + if (!bucketColumns.isEmpty()) { descs.add(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, ReduceField.KEY.toString()+".'"+BUCKET_NUMBER_COL_NAME+"'", null, false)); colNames.add("'"+BUCKET_NUMBER_COL_NAME+"'"); ColumnInfo ci = new ColumnInfo(BUCKET_NUMBER_COL_NAME, TypeInfoFactory.stringTypeInfo, selRS.getSignature().get(0).getTabAlias(), true, true); @@ -268,7 +270,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // Set if partition sorted or partition bucket sorted fsOp.getConf().setDpSortState(FileSinkDesc.DPSortState.PARTITION_SORTED); - if (bucketColumns.size() > 0 || fsOp.getConf().getWriteType() == Operation.DELETE || fsOp.getConf().getWriteType() == Operation.UPDATE) { + if (bucketColumns.size() > 0) { fsOp.getConf().setDpSortState(FileSinkDesc.DPSortState.PARTITION_BUCKET_SORTED); } @@ -441,7 +443,7 @@ public ReduceSinkOperator getReduceSinkOp(List partitionPositions, int numPartAndBuck = partitionPositions.size(); keyColsPosInVal.addAll(partitionPositions); - if (!bucketColumns.isEmpty() || writeType == Operation.DELETE || writeType == Operation.UPDATE) { + if (!bucketColumns.isEmpty()) { keyColsPosInVal.add(-1); numPartAndBuck += 1; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java index d53efbf..c3ba97e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java @@ -256,9 +256,9 @@ protected boolean merge(ReduceSinkOperator cRS, JoinOperator pJoin, int minReduc * If parent RS has not been assigned any partitioning column, we will use * partitioning columns (if exist) of child RS. */ - protected boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer) + protected boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, ReduceSinkDeduplicateProcCtx ctx) throws SemanticException { - int[] result = extractMergeDirections(cRS, pRS, minReducer); + int[] result = extractMergeDirections(cRS, pRS, ctx.minReducer()); if (result == null) { return false; } @@ -337,6 +337,9 @@ protected boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minR pRS.getConf().setKeySerializeInfo(keyTable); } } + if(ctx.getPctx().getConf().getBoolVar(ConfVars.EK_PROP)) { + pRS.getConf().setWriteType(cRS.getConf().getWriteType()); + } return true; } @@ -648,7 +651,7 @@ public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedup ReduceSinkOperator pRS = CorrelationUtilities.findPossibleParent( pGBY, ReduceSinkOperator.class, dedupCtx.trustScript()); - if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) { + if (pRS != null && merge(cRS, pRS, dedupCtx)) { CorrelationUtilities.replaceReduceSinkWithSelectOperator( cRS, dedupCtx.getPctx(), dedupCtx); pRS.getConf().setDeduplicated(true); @@ -671,7 +674,7 @@ public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, } ReduceSinkOperator pRS = CorrelationUtilities.getSingleParent(pGBY, ReduceSinkOperator.class); - if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) { + if (pRS != null && merge(cRS, pRS, dedupCtx)) { CorrelationUtilities.removeReduceSinkForGroupBy( cRS, cGBY, dedupCtx.getPctx(), dedupCtx); pRS.getConf().setDeduplicated(true); @@ -744,7 +747,7 @@ public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedup return true; } // Normal deduplication - if (merge(cRS, pRS, dedupCtx.minReducer())) { + if (merge(cRS, pRS, dedupCtx)) { CorrelationUtilities.replaceReduceSinkWithSelectOperator( cRS, dedupCtx.getPctx(), dedupCtx); pRS.getConf().setDeduplicated(true); @@ -763,7 +766,7 @@ public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, ReduceSinkOperator pRS = CorrelationUtilities.findPossibleParent( start, ReduceSinkOperator.class, dedupCtx.trustScript()); - if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) { + if (pRS != null && merge(cRS, pRS, dedupCtx)) { if (dedupCtx.getPctx().getConf().getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) { return false; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 5cc1c45..4716adc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -438,7 +438,10 @@ public void setRemovedReduceSinkBucketSort(boolean removedReduceSinkBucketSort) public DPSortState getDpSortState() { return dpSortState; } - + @Explain(displayName = "Dp Sort State") + public String getDpSortStateString() { + return getDpSortState() == DPSortState.NONE ? null : getDpSortState().toString(); + } public void setDpSortState(DPSortState dpSortState) { this.dpSortState = dpSortState; } @@ -450,7 +453,10 @@ public void setWriteType(AcidUtils.Operation type) { public AcidUtils.Operation getWriteType() { return writeType; } - + @Explain(displayName = "Write Type") + public String getWriteTypeString() { + return getWriteType() == AcidUtils.Operation.NOT_ACID ? null : getWriteType().toString(); + } public void setTransactionId(long id) { txnId = id; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index d77a223..54b86df 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -124,7 +124,7 @@ private ReducerTraits(int trait) { private EnumSet reduceTraits = EnumSet.of(ReducerTraits.UNSET); // Write type, since this needs to calculate buckets differently for updates and deletes - private AcidUtils.Operation writeType; + private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID; // whether this RS is deduplicated private transient boolean isDeduplicated = false; @@ -159,7 +159,7 @@ public ReduceSinkDesc(ArrayList keyCols, this.distinctColumnIndices = distinctColumnIndices; this.setNumBuckets(-1); this.setBucketCols(null); - this.writeType = writeType; + this.setWriteType(writeType); this.vectorDesc = null; } @@ -193,6 +193,7 @@ public Object clone() { if (vectorDesc != null) { throw new RuntimeException("Clone with vectorization desc not supported"); } + desc.setWriteType(this.getWriteType()); desc.vectorDesc = null; return desc; } @@ -475,7 +476,13 @@ public final void setReducerTraits(EnumSet traits) { public AcidUtils.Operation getWriteType() { return writeType; } - + @Explain(displayName = "Write Type") + public String getWriteTypeString() { + return getWriteType() == AcidUtils.Operation.NOT_ACID ? null : getWriteType().toString(); + } + public void setWriteType(AcidUtils.Operation wt) { + writeType = wt; + } public boolean isDeduplicated() { return isDeduplicated; } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 1aef7ac..05b6fc4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -266,7 +266,7 @@ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, if (parsedDeltas.size() == 0 && dir.getOriginalFiles() == null) { // Skip compaction if there's no delta files AND there's no original files - LOG.error("No delta files or original files found to compact in " + sd.getLocation()); + LOG.error("No delta files or original files found to compact in " + sd.getLocation() + " for compactionId=" + ci.id); return; } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 6718ae9..aff6457 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -1454,6 +1454,14 @@ public void testMergeWithPredicate() throws Exception { */ @Test public void testMerge2() throws Exception { + d.destroy(); + HiveConf c = new HiveConf(hiveConf); + c.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false); + c.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, false); + c.setBoolVar(HiveConf.ConfVars.EK_PROP, true); + + d = new Driver(c); + d.setMaxRows(1000); int[][] baseValsOdd = {{5,5},{11,11}}; int[][] baseValsEven = {{2,2},{4,44}}; runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd)); @@ -1466,9 +1474,10 @@ public void testMerge2() throws Exception { " using " + Table.NONACIDPART2 + " source ON " + Table.ACIDTBL + ".a = source.a2 " + "WHEN MATCHED THEN UPDATE set b = source.b2 " + "WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) ";//AND b < 1 + r = runStatementOnDriver("explain " + query); + logResuts(r, "Explain1", ""); + r = runStatementOnDriver(query); - //r = runStatementOnDriver("explain " + query); - //logResuts(r, "Explain logical1", ""); r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); int[][] rExpected = {{2,2},{4,44},{5,5},{7,8},{11,11}}; diff --git ql/src/test/results/clientpositive/autoColumnStats_4.q.out ql/src/test/results/clientpositive/autoColumnStats_4.q.out index c7b9b4f..d7cdcb5 100644 --- ql/src/test/results/clientpositive/autoColumnStats_4.q.out +++ ql/src/test/results/clientpositive/autoColumnStats_4.q.out @@ -97,6 +97,7 @@ STAGE PLANS: Map-reduce partition columns: _col0 (type: int) Statistics: Num rows: 10 Data size: 2150 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: int), _col1 (type: varchar(128)) + Write Type: INSERT Reduce Operator Tree: Select Operator expressions: VALUE._col0 (type: int), VALUE._col1 (type: varchar(128)) @@ -110,6 +111,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde name: default.acid_dtt + Write Type: INSERT Select Operator expressions: _col0 (type: int), _col1 (type: varchar(128)) outputColumnNames: a, b diff --git ql/src/test/results/clientpositive/dynpart_sort_optimization_acid2.q.out ql/src/test/results/clientpositive/dynpart_sort_optimization_acid2.q.out index 0b6e992..c781f88 100644 --- ql/src/test/results/clientpositive/dynpart_sort_optimization_acid2.q.out +++ ql/src/test/results/clientpositive/dynpart_sort_optimization_acid2.q.out @@ -40,6 +40,7 @@ STAGE PLANS: Map-reduce partition columns: _col2 (type: string), _col3 (type: string) Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: string) + Dp Sort State: PARTITION_BUCKET_SORTED Reduce Operator Tree: Select Operator expressions: VALUE._col0 (type: string), KEY._col1 (type: string), KEY._col2 (type: string), KEY._col3 (type: string), KEY.'_bucket_number' (type: string) diff --git ql/src/test/results/clientpositive/llap/dynpart_sort_opt_vectorization.q.out ql/src/test/results/clientpositive/llap/dynpart_sort_opt_vectorization.q.out index eaa394d..0e16ff1 100644 --- ql/src/test/results/clientpositive/llap/dynpart_sort_opt_vectorization.q.out +++ ql/src/test/results/clientpositive/llap/dynpart_sort_opt_vectorization.q.out @@ -198,6 +198,7 @@ STAGE PLANS: Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat @@ -290,6 +291,7 @@ STAGE PLANS: Statistics: Num rows: 10 Data size: 2960 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 10 Data size: 2960 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat @@ -363,6 +365,7 @@ STAGE PLANS: Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_BUCKET_SORTED Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat @@ -435,6 +438,7 @@ STAGE PLANS: Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_BUCKET_SORTED Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat @@ -575,6 +579,7 @@ STAGE PLANS: Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat @@ -667,6 +672,7 @@ STAGE PLANS: Statistics: Num rows: 10 Data size: 2960 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 10 Data size: 2960 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat @@ -740,6 +746,7 @@ STAGE PLANS: Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_BUCKET_SORTED Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat @@ -812,6 +819,7 @@ STAGE PLANS: Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_BUCKET_SORTED Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat @@ -1426,6 +1434,7 @@ STAGE PLANS: Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -1520,6 +1529,7 @@ STAGE PLANS: Statistics: Num rows: 10 Data size: 2960 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 10 Data size: 2960 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -1676,6 +1686,7 @@ STAGE PLANS: Statistics: Num rows: 524 Data size: 155436 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 524 Data size: 155436 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -2133,6 +2144,7 @@ STAGE PLANS: Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_BUCKET_SORTED Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat diff --git ql/src/test/results/clientpositive/llap/dynpart_sort_optimization.q.out ql/src/test/results/clientpositive/llap/dynpart_sort_optimization.q.out index 95a4e0f..1ef0740 100644 --- ql/src/test/results/clientpositive/llap/dynpart_sort_optimization.q.out +++ ql/src/test/results/clientpositive/llap/dynpart_sort_optimization.q.out @@ -155,6 +155,7 @@ STAGE PLANS: Statistics: Num rows: 4442 Data size: 106611 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 4442 Data size: 106611 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -247,6 +248,7 @@ STAGE PLANS: Statistics: Num rows: 10 Data size: 240 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 10 Data size: 240 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -320,6 +322,7 @@ STAGE PLANS: Statistics: Num rows: 4442 Data size: 106611 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_BUCKET_SORTED Statistics: Num rows: 4442 Data size: 106611 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -392,6 +395,7 @@ STAGE PLANS: Statistics: Num rows: 4442 Data size: 106611 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_BUCKET_SORTED Statistics: Num rows: 4442 Data size: 106611 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -532,6 +536,7 @@ STAGE PLANS: Statistics: Num rows: 4442 Data size: 106611 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 4442 Data size: 106611 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -624,6 +629,7 @@ STAGE PLANS: Statistics: Num rows: 10 Data size: 240 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 10 Data size: 240 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -697,6 +703,7 @@ STAGE PLANS: Statistics: Num rows: 4442 Data size: 106611 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_BUCKET_SORTED Statistics: Num rows: 4442 Data size: 106611 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -769,6 +776,7 @@ STAGE PLANS: Statistics: Num rows: 4442 Data size: 106611 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_BUCKET_SORTED Statistics: Num rows: 4442 Data size: 106611 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -1383,6 +1391,7 @@ STAGE PLANS: Statistics: Num rows: 4442 Data size: 106611 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 4442 Data size: 106611 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -1477,6 +1486,7 @@ STAGE PLANS: Statistics: Num rows: 10 Data size: 240 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 10 Data size: 240 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -1633,6 +1643,7 @@ STAGE PLANS: Statistics: Num rows: 2221 Data size: 53305 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 2221 Data size: 53305 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -2090,6 +2101,7 @@ STAGE PLANS: Statistics: Num rows: 4442 Data size: 106611 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_BUCKET_SORTED Statistics: Num rows: 4442 Data size: 106611 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -2464,6 +2476,7 @@ STAGE PLANS: Statistics: Num rows: 429 Data size: 53255 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 429 Data size: 53255 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -2538,6 +2551,7 @@ STAGE PLANS: Statistics: Num rows: 429 Data size: 53255 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 429 Data size: 53255 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -2612,6 +2626,7 @@ STAGE PLANS: Statistics: Num rows: 429 Data size: 53255 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 429 Data size: 53255 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -2686,6 +2701,7 @@ STAGE PLANS: Statistics: Num rows: 214 Data size: 26565 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 214 Data size: 26565 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -2760,6 +2776,7 @@ STAGE PLANS: Statistics: Num rows: 214 Data size: 26565 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 214 Data size: 26565 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -2834,6 +2851,7 @@ STAGE PLANS: Statistics: Num rows: 214 Data size: 26565 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 214 Data size: 26565 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat diff --git ql/src/test/results/clientpositive/llap/dynpart_sort_optimization2.q.out ql/src/test/results/clientpositive/llap/dynpart_sort_optimization2.q.out index 41a7709..667d980 100644 --- ql/src/test/results/clientpositive/llap/dynpart_sort_optimization2.q.out +++ ql/src/test/results/clientpositive/llap/dynpart_sort_optimization2.q.out @@ -120,6 +120,7 @@ STAGE PLANS: Statistics: Num rows: 2 Data size: 24 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 2 Data size: 24 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat @@ -357,6 +358,7 @@ STAGE PLANS: Statistics: Num rows: 5 Data size: 60 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + Dp Sort State: PARTITION_SORTED Statistics: Num rows: 5 Data size: 60 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat diff --git ql/src/test/results/clientpositive/llap/sqlmerge.q.out ql/src/test/results/clientpositive/llap/sqlmerge.q.out index 2a3d7db..f663b70 100644 --- ql/src/test/results/clientpositive/llap/sqlmerge.q.out +++ ql/src/test/results/clientpositive/llap/sqlmerge.q.out @@ -97,6 +97,7 @@ STAGE PLANS: sort order: + Map-reduce partition columns: UDFToInteger(_col0) (type: int) Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Write Type: DELETE Filter Operator predicate: ((_col5 <= 8) and (_col0 = _col5)) (type: boolean) Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE @@ -110,6 +111,7 @@ STAGE PLANS: Map-reduce partition columns: UDFToInteger(_col0) (type: int) Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE value expressions: _col1 (type: int) + Write Type: UPDATE Filter Operator predicate: (_col0 = _col5) (type: boolean) Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE @@ -141,6 +143,7 @@ STAGE PLANS: Map-reduce partition columns: _col0 (type: int) Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE value expressions: _col0 (type: int), _col1 (type: int) + Write Type: INSERT Reducer 3 Execution mode: llap Reduce Operator Tree: @@ -156,6 +159,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde name: default.acidtbl + Write Type: DELETE Reducer 4 Execution mode: llap Reduce Operator Tree: @@ -171,6 +175,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde name: default.acidtbl + Write Type: UPDATE Reducer 5 Execution mode: llap Reduce Operator Tree: @@ -210,6 +215,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde name: default.acidtbl + Write Type: INSERT Stage: Stage-5 Dependency Collection @@ -335,6 +341,7 @@ STAGE PLANS: Map-reduce partition columns: _col0 (type: int) Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE value expressions: _col0 (type: int), _col1 (type: int) + Write Type: INSERT Reducer 3 Execution mode: llap Reduce Operator Tree: @@ -350,6 +357,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde name: default.acidtbl + Write Type: INSERT Stage: Stage-2 Dependency Collection diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java index 6945a67..ba44bae 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java @@ -617,6 +617,11 @@ public static int getBucketNumber(Object[] bucketFields, ObjectInspector[] bucke * @param hashCode as produced by {@link #getBucketHashCode(Object[], ObjectInspector[])} */ public static int getBucketNumber(int hashCode, int numberOfBuckets) { + if(numberOfBuckets <= 0) { + //note that (X % 0) is illegal and (X % -1) = 0 + // -1 is a common default when the value is missing + throw new IllegalArgumentException("Number of Buckets must be > 0"); + } return (hashCode & Integer.MAX_VALUE) % numberOfBuckets; } /**