From c0774da927451008ba78ed7b8637a1a4899d9e12 Mon Sep 17 00:00:00 2001 From: luguangming Date: Mon, 12 Aug 2019 14:24:05 +0800 Subject: [PATCH]HIVE-22098 --- .../apache/hadoop/hive/ql/exec/mr/ExecMapper.java | 41 ++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java index 99b33a3..d0c847e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java @@ -20,10 +20,12 @@ import java.io.IOException; import java.net.URLClassLoader; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -104,6 +106,10 @@ public void configure(JobConf job) { // initialize map operator mo.initialize(job, null); mo.setChildren(job); + + // defined self balance ReduceSinkOperator of bucketVersion + balanceRSOpbucketVersion(mo); + l4j.info(mo.dump(0)); // initialize map local work localWork = mrwork.getMapRedLocalWork(); @@ -138,6 +144,41 @@ public void configure(JobConf job) { } } } + + /** + * defined-self balance ReduceSinkOperator of bucketVersion, keep values to sameness + * @param rootOp + */ + private static void balanceRSOpbucketVersion(Operator rootOp){ + List> needDealOps = new ArrayList>(); + visitChildGetRSOps(rootOp, needDealOps); + int bucketVersion = -1; + for(Operator rsop : needDealOps){ + if(rsop.getBucketingVersion() != 2 && rsop.getBucketingVersion() != 1){ + rsop.setBucketingVersion(-1); + } + if(rsop.getBucketingVersion() > bucketVersion){ + bucketVersion = rsop.getBucketingVersion(); + } + } + for(Operator rsop : needDealOps){ + l4j.info("update reduceSinkOperator name="+rsop.getName()+", opId="+rsop.getOperatorId()+", oldBucketVersion="+rsop.getBucketingVersion()+", newBucketVersion="+bucketVersion); + rsop.setBucketingVersion(bucketVersion); + } + needDealOps.clear(); + } + private static void visitChildGetRSOps(Operator rootOp, List> needDealOps){ + List> ops = rootOp.getChildOperators(); + if(ops == null || ops.isEmpty()){ + return; + } + for(Operator op : ops) { + if (op instanceof ReduceSinkOperator) { + needDealOps.add(op); + } + visitChildGetRSOps(op, needDealOps); + } + } @Override public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { -- 2.9.2