From 496b2d099f7a99938f4bc769d3758d4b6ad9b132 Mon Sep 17 00:00:00 2001 From: Ashutosh Chauhan Date: Fri, 16 Sep 2016 18:15:26 -0700 Subject: [PATCH] HIVE-14783 : bucketing column should be part of sorting for delete/update operation when spdo is on --- .../apache/hadoop/hive/ql/exec/FileSinkOperator.java | 10 +++++----- .../hadoop/hive/ql/exec/ReduceSinkOperator.java | 9 +++++++++ .../ql/optimizer/SortedDynPartitionOptimizer.java | 4 ++-- .../dynpart_sort_optimization_acid.q.out | 20 ++++++++++---------- 4 files changed, 26 insertions(+), 17 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index e386717..eeba6cd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -766,19 +766,19 @@ public void process(Object row, int tag) throws HiveException { if (fpaths.acidLastBucket != bucketNum) { fpaths.acidLastBucket = bucketNum; // Switch files - fpaths.updaters[++fpaths.acidFileOffset] = HiveFileFormatUtils.getAcidRecordUpdater( - jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[fpaths.acidFileOffset], + fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 : ++fpaths.acidFileOffset] = HiveFileFormatUtils.getAcidRecordUpdater( + jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 :fpaths.acidFileOffset], rowInspector, reporter, 0); if (isDebugEnabled) { LOG.debug("Created updater for bucket number " + bucketNum + " using file " + - fpaths.outPaths[fpaths.acidFileOffset]); + fpaths.outPaths[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 :fpaths.acidFileOffset]); } } if (conf.getWriteType() == AcidUtils.Operation.UPDATE) { - fpaths.updaters[fpaths.acidFileOffset].update(conf.getTransactionId(), row); + fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 :fpaths.acidFileOffset].update(conf.getTransactionId(), row); } else if (conf.getWriteType() == AcidUtils.Operation.DELETE) { - fpaths.updaters[fpaths.acidFileOffset].delete(conf.getTransactionId(), row); + fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 :fpaths.acidFileOffset].delete(conf.getTransactionId(), row); } else { throw new HiveException("Unknown write type " + conf.getWriteType().toString()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index a9885d8..31a7b7a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -35,6 +35,8 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; @@ -78,6 +80,7 @@ private transient ObjectInspector[] partitionObjectInspectors; private transient ObjectInspector[] bucketObjectInspectors; private transient int buckColIdxInKey; + private transient int buckColIdxInKeyForAcid = -1; private boolean firstRow; private transient int tag; private boolean skipTag = false; @@ -183,6 +186,9 @@ protected void initializeOp(Configuration hconf) throws HiveException { keyEval = new ExprNodeEvaluator[keys.size()]; int i = 0; for (ExprNodeDesc e : keys) { + if (e instanceof ExprNodeConstantDesc && ((ExprNodeConstantDesc)e).getValue().equals("_bucket_number")) { + buckColIdxInKeyForAcid = i; + } keyEval[i++] = ExprNodeEvaluatorFactory.get(e); } @@ -359,6 +365,9 @@ public void process(Object row, int tag) throws HiveException { // In the non-partitioned case we still want to compute the bucket number for updates and // deletes. bucketNumber = computeBucketNumber(row, conf.getNumBuckets()); + if (buckColIdxInKeyForAcid != -1) { + cachedKeys[0][buckColIdxInKeyForAcid] = new Text(String.valueOf(bucketNumber)); + } } HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java index 8b4af72..926386b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java @@ -247,7 +247,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } } RowSchema selRS = new RowSchema(fsParent.getSchema()); - if (!bucketColumns.isEmpty()) { + if (!bucketColumns.isEmpty() || fsOp.getConf().getWriteType() == Operation.DELETE || fsOp.getConf().getWriteType() == Operation.UPDATE) { descs.add(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, ReduceField.KEY.toString()+".'"+BUCKET_NUMBER_COL_NAME+"'", null, false)); colNames.add("'"+BUCKET_NUMBER_COL_NAME+"'"); ColumnInfo ci = new ColumnInfo(BUCKET_NUMBER_COL_NAME, TypeInfoFactory.stringTypeInfo, selRS.getSignature().get(0).getTabAlias(), true, true); @@ -268,7 +268,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // Set if partition sorted or partition bucket sorted fsOp.getConf().setDpSortState(FileSinkDesc.DPSortState.PARTITION_SORTED); - if (bucketColumns.size() > 0) { + if (bucketColumns.size() > 0 || fsOp.getConf().getWriteType() == Operation.DELETE || fsOp.getConf().getWriteType() == Operation.UPDATE) { fsOp.getConf().setDpSortState(FileSinkDesc.DPSortState.PARTITION_BUCKET_SORTED); } diff --git a/ql/src/test/results/clientpositive/dynpart_sort_optimization_acid.q.out b/ql/src/test/results/clientpositive/dynpart_sort_optimization_acid.q.out index 1838d6a..111ce18 100644 --- a/ql/src/test/results/clientpositive/dynpart_sort_optimization_acid.q.out +++ b/ql/src/test/results/clientpositive/dynpart_sort_optimization_acid.q.out @@ -422,8 +422,8 @@ STAGE PLANS: Statistics: Num rows: 892 Data size: 2676 Basic stats: COMPLETE Column stats: NONE Reduce Operator Tree: Select Operator - expressions: KEY._col0 (type: struct), 'foo' (type: string), 'bar' (type: string), KEY._col3 (type: string) - outputColumnNames: _col0, _col1, _col2, _col3 + expressions: KEY._col0 (type: struct), 'foo' (type: string), 'bar' (type: string), KEY._col3 (type: string), KEY.'_bucket_number' (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, '_bucket_number' Statistics: Num rows: 892 Data size: 2676 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false @@ -1042,8 +1042,8 @@ STAGE PLANS: Statistics: Num rows: 1517 Data size: 4551 Basic stats: COMPLETE Column stats: NONE Reduce Operator Tree: Select Operator - expressions: KEY._col0 (type: struct), 'foo' (type: string), 'bar' (type: string), '2008-04-08' (type: string), KEY._col4 (type: int) - outputColumnNames: _col0, _col1, _col2, _col3, _col4 + expressions: KEY._col0 (type: struct), 'foo' (type: string), 'bar' (type: string), '2008-04-08' (type: string), KEY._col4 (type: int), KEY.'_bucket_number' (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, '_bucket_number' Statistics: Num rows: 1517 Data size: 4551 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false @@ -1152,8 +1152,8 @@ STAGE PLANS: Statistics: Num rows: 2979 Data size: 8937 Basic stats: COMPLETE Column stats: NONE Reduce Operator Tree: Select Operator - expressions: KEY._col0 (type: struct), KEY._col1 (type: string), KEY._col2 (type: int) - outputColumnNames: _col0, _col1, _col2 + expressions: KEY._col0 (type: struct), KEY._col1 (type: string), KEY._col2 (type: int), KEY.'_bucket_number' (type: string) + outputColumnNames: _col0, _col1, _col2, '_bucket_number' Statistics: Num rows: 2979 Data size: 8937 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false @@ -1327,8 +1327,8 @@ STAGE PLANS: value expressions: _col1 (type: string), 'bar' (type: string) Reduce Operator Tree: Select Operator - expressions: KEY._col0 (type: struct), VALUE._col1 (type: string), VALUE._col2 (type: string), KEY._col3 (type: string), KEY._col4 (type: int) - outputColumnNames: _col0, _col1, _col2, _col3, _col4 + expressions: KEY._col0 (type: struct), VALUE._col1 (type: string), VALUE._col2 (type: string), KEY._col3 (type: string), KEY._col4 (type: int), KEY.'_bucket_number' (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, '_bucket_number' Statistics: Num rows: 23 Data size: 2322 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false @@ -1407,8 +1407,8 @@ STAGE PLANS: value expressions: _col1 (type: string), 'bar' (type: string) Reduce Operator Tree: Select Operator - expressions: KEY._col0 (type: struct), VALUE._col1 (type: string), VALUE._col2 (type: string), KEY._col3 (type: string), KEY._col4 (type: int) - outputColumnNames: _col0, _col1, _col2, _col3, _col4 + expressions: KEY._col0 (type: struct), VALUE._col1 (type: string), VALUE._col2 (type: string), KEY._col3 (type: string), KEY._col4 (type: int), KEY.'_bucket_number' (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, '_bucket_number' Statistics: Num rows: 45 Data size: 4550 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false -- 1.7.12.4 (Apple Git-37)