diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index dfd790853b..323e152f4d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -219,6 +219,8 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, new ConstantPropagate(ConstantPropagateOption.SHORTCUT).transform(procCtx.parseContext); } + updateBucketingVersion(procCtx); + } private void runCycleAnalysisForPartitionPruning(OptimizeTezProcContext procCtx, @@ -1449,4 +1451,32 @@ private void markSemiJoinForDPP(OptimizeTezProcContext procCtx) } } } + + /** + * Update bucketing version of ReduceSinkOp if exists old bucketing version table + * @param procCtx + */ + private void updateBucketingVersion(OptimizeTezProcContext procCtx) { + // Fetch all the FileSinkOperators. + Set fsOpsAll = new HashSet<>(); + for (TableScanOperator ts : procCtx.parseContext.getTopOps().values()){ + Set fsOps = OperatorUtils.findOperators(ts, ReduceSinkOperator.class); + fsOpsAll.addAll(fsOps); + } + + int bucketVersion = -1; + for (ReduceSinkOperator rsop : fsOpsAll){ + if (rsop.getBucketingVersion() !=2 && rsop.getBucketingVersion() != 1){ + rsop.setBucketingVersion(-1); + } + if (rsop.getBucketingVersion() > bucketVersion){ + bucketVersion = rsop.getBucketingVersion(); + } + } + for (Operator rsop : fsOpsAll){ + LOG.info("tez update reduceSinkOperator name = " + rsop.getName() + ", opId = " + rsop.getOperatorId() + ", oldBucketVersion = " + + rsop.getBucketingVersion() + ", newBucketVersion = " + bucketVersion); + rsop.setBucketingVersion(bucketVersion); + } + } }