diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f0c129b..a12475a 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1791,8 +1791,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager", "Set to org.apache.hadoop.hive.ql.lockmgr.DbTxnManager as part of turning on Hive\n" + "transactions, which also requires appropriate settings for hive.compactor.initiator.on,\n" + - "hive.compactor.worker.threads, hive.support.concurrency (true), hive.enforce.bucketing\n" + - "(true), and hive.exec.dynamic.partition.mode (nonstrict).\n" + + "hive.compactor.worker.threads, hive.support.concurrency (true),\n" + + "and hive.exec.dynamic.partition.mode (nonstrict).\n" + "The default DummyTxnManager replicates pre-Hive-0.13 behavior and provides\n" + "no transactions."), HIVE_TXN_STRICT_LOCKING_MODE("hive.txn.strict.locking.mode", true, "In strict mode non-ACID\n" + @@ -1920,6 +1920,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal MERGE_CARDINALITY_VIOLATION_CHECK("hive.merge.cardinality.check", true, "Set to true to ensure that each SQL Merge statement ensures that for each row in the target\n" + "table there is at most 1 matching row in the source table per SQL Specification."), + EK_PROP("fix.dedup", true, ""), // For Druid storage handler HIVE_DRUID_INDEXING_GRANULARITY("hive.druid.indexer.segments.granularity", "DAY", diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 789d2a3..db4b0f4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec; +import static org.apache.hadoop.hive.ql.optimizer.SortedDynPartitionOptimizer.BUCKET_NUMBER_COL_NAME; import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM; import java.io.IOException; @@ -186,7 +187,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { keyEval = new ExprNodeEvaluator[keys.size()]; int i = 0; for (ExprNodeDesc e : keys) { - if (e instanceof ExprNodeConstantDesc && ("_bucket_number").equals(((ExprNodeConstantDesc)e).getValue())) { + if (e instanceof ExprNodeConstantDesc && (BUCKET_NUMBER_COL_NAME).equals(((ExprNodeConstantDesc)e).getValue())) { buckColIdxInKeyForAcid = i; } keyEval[i++] = ExprNodeEvaluatorFactory.get(e); @@ -319,8 +320,8 @@ public void process(Object row, int tag) throws HiveException { // TODO: this is fishy - we init object inspectors based on first tag. We // should either init for each tag, or if rowInspector doesn't really // matter, then we can create this in ctor and get rid of firstRow. - if (conf.getWriteType() == AcidUtils.Operation.UPDATE || - conf.getWriteType() == AcidUtils.Operation.DELETE) { + if (false && (conf.getWriteType() == AcidUtils.Operation.UPDATE || + conf.getWriteType() == AcidUtils.Operation.DELETE)) { assert rowInspector instanceof StructObjectInspector : "Expected rowInspector to be instance of StructObjectInspector but it is a " + rowInspector.getClass().getName(); @@ -360,8 +361,8 @@ public void process(Object row, int tag) throws HiveException { if (bucketEval != null) { bucketNumber = computeBucketNumber(row, conf.getNumBuckets()); cachedKeys[0][buckColIdxInKey] = new Text(String.valueOf(bucketNumber)); - } else if (conf.getWriteType() == AcidUtils.Operation.UPDATE || - conf.getWriteType() == AcidUtils.Operation.DELETE) { + } else if (false && (conf.getWriteType() == AcidUtils.Operation.UPDATE || + conf.getWriteType() == AcidUtils.Operation.DELETE)) { // In the non-partitioned case we still want to compute the bucket number for updates and // deletes. bucketNumber = computeBucketNumber(row, conf.getNumBuckets()); @@ -369,6 +370,10 @@ public void process(Object row, int tag) throws HiveException { cachedKeys[0][buckColIdxInKeyForAcid] = new Text(String.valueOf(bucketNumber)); } } + if (buckColIdxInKeyForAcid != -1) { + //why is this called forAcid? SPDO seems to create this for any bucketed table + cachedKeys[0][buckColIdxInKeyForAcid] = new Text(String.valueOf(bucketNumber)); + } HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null); int distKeyLength = firstKey.getDistKeyLength(); @@ -427,8 +432,8 @@ public void process(Object row, int tag) throws HiveException { } private int computeBucketNumber(Object row, int numBuckets) throws HiveException { - if (conf.getWriteType() == AcidUtils.Operation.UPDATE || - conf.getWriteType() == AcidUtils.Operation.DELETE) { + if (false && (conf.getWriteType() == AcidUtils.Operation.UPDATE || + conf.getWriteType() == AcidUtils.Operation.DELETE)) { // We don't need to evaluate the hash code. Instead read the bucket number directly from // the row. I don't need to evaluate any expressions as I know I am reading the ROW__ID // column directly. @@ -492,7 +497,7 @@ private int computeHashCode(Object row, int buckNum) throws HiveException { } keyHashCode = random.nextInt(); } else { - keyHashCode = 1; + keyHashCode = 1;//todo: makes no sense - } } else { Object[] bucketFieldValues = new Object[partitionEval.length]; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java index 926386b..2779f0c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java @@ -81,7 +81,7 @@ */ public class SortedDynPartitionOptimizer extends Transform { - private static final String BUCKET_NUMBER_COL_NAME = "_bucket_number"; + public static final String BUCKET_NUMBER_COL_NAME = "_bucket_number"; @Override public ParseContext transform(ParseContext pCtx) throws SemanticException { @@ -194,6 +194,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, sortPositions = Arrays.asList(0); sortOrder = Arrays.asList(1); // 1 means asc, could really use enum here in the thrift if bucketColumns = new ArrayList<>(); // Bucketing column is already present in ROW__ID, which is specially handled in ReduceSink + //ROW__ID is always the 1st column of Insert representing Update/Delete and we wrap it in UDFToInteger somewhere which extracts bucketId from it + bucketColumns.add(new ExprNodeColumnDesc(fsParent.getSchema().getSignature().get(0))); } else { if (!destTable.getSortCols().isEmpty()) { // Sort columns specified by table @@ -247,7 +249,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } } RowSchema selRS = new RowSchema(fsParent.getSchema()); - if (!bucketColumns.isEmpty() || fsOp.getConf().getWriteType() == Operation.DELETE || fsOp.getConf().getWriteType() == Operation.UPDATE) { + if (!bucketColumns.isEmpty()) { descs.add(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, ReduceField.KEY.toString()+".'"+BUCKET_NUMBER_COL_NAME+"'", null, false)); colNames.add("'"+BUCKET_NUMBER_COL_NAME+"'"); ColumnInfo ci = new ColumnInfo(BUCKET_NUMBER_COL_NAME, TypeInfoFactory.stringTypeInfo, selRS.getSignature().get(0).getTabAlias(), true, true); @@ -268,7 +270,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // Set if partition sorted or partition bucket sorted fsOp.getConf().setDpSortState(FileSinkDesc.DPSortState.PARTITION_SORTED); - if (bucketColumns.size() > 0 || fsOp.getConf().getWriteType() == Operation.DELETE || fsOp.getConf().getWriteType() == Operation.UPDATE) { + if (bucketColumns.size() > 0) { fsOp.getConf().setDpSortState(FileSinkDesc.DPSortState.PARTITION_BUCKET_SORTED); } @@ -441,7 +443,7 @@ public ReduceSinkOperator getReduceSinkOp(List partitionPositions, int numPartAndBuck = partitionPositions.size(); keyColsPosInVal.addAll(partitionPositions); - if (!bucketColumns.isEmpty() || writeType == Operation.DELETE || writeType == Operation.UPDATE) { + if (!bucketColumns.isEmpty()) { keyColsPosInVal.add(-1); numPartAndBuck += 1; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java index d53efbf..c3ba97e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java @@ -256,9 +256,9 @@ protected boolean merge(ReduceSinkOperator cRS, JoinOperator pJoin, int minReduc * If parent RS has not been assigned any partitioning column, we will use * partitioning columns (if exist) of child RS. */ - protected boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer) + protected boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, ReduceSinkDeduplicateProcCtx ctx) throws SemanticException { - int[] result = extractMergeDirections(cRS, pRS, minReducer); + int[] result = extractMergeDirections(cRS, pRS, ctx.minReducer()); if (result == null) { return false; } @@ -337,6 +337,9 @@ protected boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minR pRS.getConf().setKeySerializeInfo(keyTable); } } + if(ctx.getPctx().getConf().getBoolVar(ConfVars.EK_PROP)) { + pRS.getConf().setWriteType(cRS.getConf().getWriteType()); + } return true; } @@ -648,7 +651,7 @@ public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedup ReduceSinkOperator pRS = CorrelationUtilities.findPossibleParent( pGBY, ReduceSinkOperator.class, dedupCtx.trustScript()); - if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) { + if (pRS != null && merge(cRS, pRS, dedupCtx)) { CorrelationUtilities.replaceReduceSinkWithSelectOperator( cRS, dedupCtx.getPctx(), dedupCtx); pRS.getConf().setDeduplicated(true); @@ -671,7 +674,7 @@ public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, } ReduceSinkOperator pRS = CorrelationUtilities.getSingleParent(pGBY, ReduceSinkOperator.class); - if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) { + if (pRS != null && merge(cRS, pRS, dedupCtx)) { CorrelationUtilities.removeReduceSinkForGroupBy( cRS, cGBY, dedupCtx.getPctx(), dedupCtx); pRS.getConf().setDeduplicated(true); @@ -744,7 +747,7 @@ public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedup return true; } // Normal deduplication - if (merge(cRS, pRS, dedupCtx.minReducer())) { + if (merge(cRS, pRS, dedupCtx)) { CorrelationUtilities.replaceReduceSinkWithSelectOperator( cRS, dedupCtx.getPctx(), dedupCtx); pRS.getConf().setDeduplicated(true); @@ -763,7 +766,7 @@ public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, ReduceSinkOperator pRS = CorrelationUtilities.findPossibleParent( start, ReduceSinkOperator.class, dedupCtx.trustScript()); - if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) { + if (pRS != null && merge(cRS, pRS, dedupCtx)) { if (dedupCtx.getPctx().getConf().getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) { return false; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 5cc1c45..4716adc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -438,7 +438,10 @@ public void setRemovedReduceSinkBucketSort(boolean removedReduceSinkBucketSort) public DPSortState getDpSortState() { return dpSortState; } - + @Explain(displayName = "Dp Sort State") + public String getDpSortStateString() { + return getDpSortState() == DPSortState.NONE ? null : getDpSortState().toString(); + } public void setDpSortState(DPSortState dpSortState) { this.dpSortState = dpSortState; } @@ -450,7 +453,10 @@ public void setWriteType(AcidUtils.Operation type) { public AcidUtils.Operation getWriteType() { return writeType; } - + @Explain(displayName = "Write Type") + public String getWriteTypeString() { + return getWriteType() == AcidUtils.Operation.NOT_ACID ? null : getWriteType().toString(); + } public void setTransactionId(long id) { txnId = id; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index d77a223..8488112 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -124,7 +124,7 @@ private ReducerTraits(int trait) { private EnumSet reduceTraits = EnumSet.of(ReducerTraits.UNSET); // Write type, since this needs to calculate buckets differently for updates and deletes - private AcidUtils.Operation writeType; + private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID; // whether this RS is deduplicated private transient boolean isDeduplicated = false; @@ -159,7 +159,7 @@ public ReduceSinkDesc(ArrayList keyCols, this.distinctColumnIndices = distinctColumnIndices; this.setNumBuckets(-1); this.setBucketCols(null); - this.writeType = writeType; + this.setWriteType(writeType); this.vectorDesc = null; } @@ -193,6 +193,7 @@ public Object clone() { if (vectorDesc != null) { throw new RuntimeException("Clone with vectorization desc not supported"); } + desc.setWriteType(this.getWriteType()); desc.vectorDesc = null; return desc; } @@ -475,7 +476,13 @@ public final void setReducerTraits(EnumSet traits) { public AcidUtils.Operation getWriteType() { return writeType; } - + @Explain(displayName = "Write Type") + public String getWriteTypeString() { + return getWriteType() == AcidUtils.Operation.NOT_ACID ? null : getWriteType().toString(); + } + public void setWriteType(AcidUtils.Operation wt) { + writeType = wt; + } public boolean isDeduplicated() { return isDeduplicated; } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 1aef7ac..05b6fc4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -266,7 +266,7 @@ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, if (parsedDeltas.size() == 0 && dir.getOriginalFiles() == null) { // Skip compaction if there's no delta files AND there's no original files - LOG.error("No delta files or original files found to compact in " + sd.getLocation()); + LOG.error("No delta files or original files found to compact in " + sd.getLocation() + " for compactionId=" + ci.id); return; } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 6718ae9..aff6457 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -1454,6 +1454,14 @@ public void testMergeWithPredicate() throws Exception { */ @Test public void testMerge2() throws Exception { + d.destroy(); + HiveConf c = new HiveConf(hiveConf); + c.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false); + c.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, false); + c.setBoolVar(HiveConf.ConfVars.EK_PROP, true); + + d = new Driver(c); + d.setMaxRows(1000); int[][] baseValsOdd = {{5,5},{11,11}}; int[][] baseValsEven = {{2,2},{4,44}}; runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd)); @@ -1466,9 +1474,10 @@ public void testMerge2() throws Exception { " using " + Table.NONACIDPART2 + " source ON " + Table.ACIDTBL + ".a = source.a2 " + "WHEN MATCHED THEN UPDATE set b = source.b2 " + "WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) ";//AND b < 1 + r = runStatementOnDriver("explain " + query); + logResuts(r, "Explain1", ""); + r = runStatementOnDriver(query); - //r = runStatementOnDriver("explain " + query); - //logResuts(r, "Explain logical1", ""); r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); int[][] rExpected = {{2,2},{4,44},{5,5},{7,8},{11,11}}; diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java index 6945a67..ba44bae 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java @@ -617,6 +617,11 @@ public static int getBucketNumber(Object[] bucketFields, ObjectInspector[] bucke * @param hashCode as produced by {@link #getBucketHashCode(Object[], ObjectInspector[])} */ public static int getBucketNumber(int hashCode, int numberOfBuckets) { + if(numberOfBuckets <= 0) { + //note that (X % 0) is illegal and (X % -1) = 0 + // -1 is a common default when the value is missing + throw new IllegalArgumentException("Number of Buckets must be > 0"); + } return (hashCode & Integer.MAX_VALUE) % numberOfBuckets; } /**