diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 5a29684..72843ab 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1495,6 +1495,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVEIDENTITYPROJECTREMOVER("hive.optimize.remove.identity.project", true, "Removes identity project from operator tree"), HIVEMETADATAONLYQUERIES("hive.optimize.metadataonly", true, ""), HIVENULLSCANOPTIMIZE("hive.optimize.null.scan", true, "Dont scan relations which are guaranteed to not generate any rows"), + HIVEGLOBALSORTEDTABLEOPTIMIZE("hive.globalsortedtable.optimize", false, "Make distributed sorted table global sorted"), HIVEOPTPPD_STORAGE("hive.optimize.ppd.storage", true, "Whether to push predicates down to storage handlers"), HIVEOPTGROUPBY("hive.optimize.groupby", true, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index c6c53cd..011a021 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -351,6 +351,7 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration MRHelpers.translateMRConfToTez(conf); String keyClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS); String valClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS); +// boolean test = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_GLOBAL_SORTED_TABLE, false); String partitionerClassName = conf.get("mapred.partitioner.class"); Map partitionerConf; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 25c4514..d126da5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -76,6 +76,7 @@ import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatus; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.json.JSONObject; /** @@ -388,6 +389,13 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, } else { // Regular vertices JobConf wxConf = utils.initializeVertexConf(conf, ctx, w); + if(w instanceof MapWork) { + if(((MapWork)w).isAddedBySortedTable()) + wxConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_GLOBAL_SORTED_TABLE, true); + }else if(w instanceof ReduceWork) { + if(((ReduceWork)w).isAddedBySortedTable()) + wxConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_GLOBAL_SORTED_TABLE, true); + } Vertex wx = utils.createVertex(wxConf, w, scratchDir, appJarLr, additionalLr, fs, ctx, !isFinal, work, work.getVertexType(w)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index 5ee54b9..a332d4c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -175,7 +175,8 @@ public void initialize(HiveConf hiveConf) { } if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEBUCKETINGSORTING)) { - transformations.add(new BucketingSortingReduceSinkOptimizer()); + if(!HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGLOBALSORTEDTABLEOPTIMIZE)) + transformations.add(new BucketingSortingReduceSinkOptimizer()); } transformations.add(new UnionProcessor()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/DistributedSortedTableGloabalOrderDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/DistributedSortedTableGloabalOrderDispatcher.java new file mode 100644 index 0000000..6e125bf --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/DistributedSortedTableGloabalOrderDispatcher.java @@ -0,0 +1,82 @@ +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.PreOrderOnceWalker; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.optimizer.physical.DistributedSortedTableGlobalOrderOptimizer.WalkerCtx; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.UnionWork; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DistributedSortedTableGloabalOrderDispatcher implements Dispatcher { + + static final Logger LOG = LoggerFactory.getLogger(DistributedSortedTableGloabalOrderDispatcher.class.getName()); + private final PhysicalContext physicalContext; + private final Map rules; + + public DistributedSortedTableGloabalOrderDispatcher(PhysicalContext context, Map rules) { + super(); + physicalContext = context; + this.rules = rules; + } + + @Override + public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) throws SemanticException { + // TODO Auto-generated method stub + Task task = (Task) nd; + if (!(task instanceof TezTask)) + return null; + // create a the context for walking operators + TezWork work = (TezWork) task.getWork(); + List rss = physicalContext.getParseContext() + .getReduceSinkOperatorsAddedByEnforceBucketingSorting(); + for (BaseWork w : ((TezWork) task.getWork()).getAllWorkUnsorted()) { + if (w instanceof MapWork) { + Set> ops = w.getAllOperators(); + boolean candidate = false; + for (ReduceSinkOperator rs : rss) { + if (ops.contains(rs)) + candidate = true; + } + if (candidate) + ((MapWork) w).setAddedBySortedTable(true); + } else if (w instanceof ReduceWork) { + Set> ops = w.getAllOperators(); + boolean candidate = false; + for (ReduceSinkOperator rs : rss) { + if (ops.contains(rs)) + candidate = true; + } + if (candidate) + ((ReduceWork) w).setAddedBySortedTable(true); + } + } + return null; + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/DistributedSortedTableGlobalOrderOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/DistributedSortedTableGlobalOrderOptimizer.java new file mode 100644 index 0000000..f81a340 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/DistributedSortedTableGlobalOrderOptimizer.java @@ -0,0 +1,104 @@ +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.PreOrderOnceWalker; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.TezWork; + +public class DistributedSortedTableGlobalOrderOptimizer implements PhysicalPlanResolver { + + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1",".*"), + new GlobalOrderProcessor()); + WalkerCtx walkerctx = new WalkerCtx(pctx.getParseContext()); + Dispatcher disp = new DefaultRuleDispatcher(null, opRules, walkerctx); + GraphWalker ogw = new PreOrderOnceWalker(disp); + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getRootTasks()); + ogw.startWalking(topNodes, null); + return pctx; + } + + static private class GlobalOrderProcessor implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) + throws SemanticException { + // TODO Auto-generated method stub + Task task = (Task) nd; + if (!(task instanceof TezTask)) + return null; + // create a the context for walking operators + TezWork work = (TezWork) task.getWork(); + assert(procCtx instanceof WalkerCtx); + List rss = ((WalkerCtx) procCtx).getParseContext() + .getReduceSinkOperatorsAddedByEnforceBucketingSorting(); + for (BaseWork w : ((TezWork) task.getWork()).getAllWorkUnsorted()) { + if (w instanceof MapWork) { + Set> ops = w.getAllOperators(); + boolean candidate = false; + for (ReduceSinkOperator rs : rss) { + if (ops.contains(rs)) + candidate = true; + } + if (candidate) + ((MapWork) w).setAddedBySortedTable(true); + } else if (w instanceof ReduceWork) { + Set> ops = w.getAllOperators(); + boolean candidate = false; + for (ReduceSinkOperator rs : rss) { + if (ops.contains(rs)) + candidate = true; + } + if (candidate) + ((ReduceWork) w).setAddedBySortedTable(true); + } + } + return null; + } + + } + + static class WalkerCtx implements NodeProcessorCtx { + ParseContext pctx; + public WalkerCtx(ParseContext pctx){ + this.pctx = pctx; + } + public ParseContext getParseContext(){ + return this.pctx; + + } + public void setParseContext(ParseContext pctx) { + this.pctx = pctx; + } + } +} + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 66a8322..d4d6f49 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism; import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits; import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck; +import org.apache.hadoop.hive.ql.optimizer.physical.DistributedSortedTableGlobalOrderOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider; import org.apache.hadoop.hive.ql.optimizer.physical.MemoryDecider; import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer; @@ -471,6 +472,11 @@ protected void optimizeTaskPlan(List> rootTasks, Pa PhysicalContext physicalCtx = new PhysicalContext(conf, pCtx, pCtx.getContext(), rootTasks, pCtx.getFetchTask()); + if (conf.getBoolVar(HiveConf.ConfVars.HIVEGLOBALSORTEDTABLEOPTIMIZE)) { + physicalCtx = new DistributedSortedTableGlobalOrderOptimizer().resolve(physicalCtx); + }else { + LOG.debug("Skipping global sorted table optimization"); + } if (conf.getBoolVar(HiveConf.ConfVars.HIVENULLSCANOPTIMIZE)) { physicalCtx = new NullScanOptimizer().resolve(physicalCtx); } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index e138c20..b2e4c58 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -110,6 +110,8 @@ public static final int SAMPLING_ON_PREV_MR = 1; // todo HIVE-3841 public static final int SAMPLING_ON_START = 2; // sampling on task running + + private boolean addedbysortedtable = false; // the following two are used for join processing private boolean leftInputJoin; @@ -674,4 +676,12 @@ public void setVectorizedRowBatch(VectorizedRowBatch vectorizedRowBatch) { public VectorizedRowBatch getVectorizedRowBatch() { return vectorizedRowBatch; } + + public void setAddedBySortedTable(boolean b) { + this.addedbysortedtable = b; + } + + public boolean isAddedBySortedTable() { + return this.addedbysortedtable; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java index 72fc4ca..3b023fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java @@ -52,6 +52,8 @@ @SuppressWarnings({"serial", "deprecation"}) public class ReduceWork extends BaseWork { + private boolean addedbysortedtable = false; + public ReduceWork() {} public ReduceWork(String name) { @@ -252,4 +254,12 @@ public int getMaxReduceTasks() { public void setMaxReduceTasks(int maxReduceTasks) { this.maxReduceTasks = maxReduceTasks; } + + public void setAddedBySortedTable(boolean b) { + this.addedbysortedtable = b; + } + + public boolean isAddedBySortedTable() { + return this.addedbysortedtable; + } }