commit b61c5f0e6ab122d74c6c96d60c7b413fbe9ac691 Author: kellyzly Date: Tue Jan 23 23:47:09 2018 -0500 HIVE-18301.1.patch diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 05c2acd..954670a 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1662,7 +1662,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Probability with which a row will be chosen."), HIVE_REMOVE_ORDERBY_IN_SUBQUERY("hive.remove.orderby.in.subquery", true, "If set to true, order/sort by without limit in sub queries will be removed."), - HIVEOPTIMIZEDISTINCTREWRITE("hive.optimize.distinct.rewrite", true, "When applicable this " + HIVEOPTIMIZEDISTINCTREWRITE("hive.optimize.distinct.rSparkewrite", true, "When applicable this " + "optimization rewrites distinct aggregates from a single stage to multi-stage " + "aggregation. This may not be optimal in all cases. Ideally, whether to trigger it or " + "not should be cost based decision. Until Hive formalizes cost model for this, this is config driven."), @@ -1715,6 +1715,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_SHARED_WORK_OPTIMIZATION("hive.optimize.shared.work", true, "Whether to enable shared work optimizer. The optimizer finds scan operator over the same table\n" + "and follow-up operators in the query plan and merges them if they meet some preconditions. Tez only."), + HIVE_SPARK_SHARED_WORK_OPTIMIZATION("hive.spark.optimize.shared.work", false, + "Whether to enable shared work optimizer. The optimizer finds scan operator over the same table\n" + + "and follow-up operators in the query plan and merges them if they meet some preconditions."), HIVE_SHARED_WORK_EXTENDED_OPTIMIZATION("hive.optimize.shared.work.extended", true, "Whether to enable shared work extended optimizer. The optimizer tries to merge equal operators\n" + "after a work boundary after shared work optimizer has been executed. Requires hive.optimize.shared.work\n" + diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 1017249..c3a0acb 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1588,7 +1588,8 @@ miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\ # ql_rewrite_gbtoidx_cbo_1.q,\ # smb_mapjoin_8.q,\ -localSpark.only.query.files=spark_local_queries.q +localSpark.only.query.files=spark_local_queries.q,\ + spark_optimize_shared_work.q spark.query.negative.files=groupby2_map_skew_multi_distinct.q,\ groupby2_multi_distinct.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java index d240d18..95ebaf4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.exec.spark; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.IOContextMap; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableUtils; @@ -29,7 +31,8 @@ import scala.Tuple2; import com.google.common.base.Preconditions; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MapInput implements SparkTran { @@ -37,6 +40,7 @@ private boolean toCache; private final SparkPlan sparkPlan; private String name = "MapInput"; + private static final Logger LOG = LoggerFactory.getLogger(MapInput.class.getName()); public MapInput(SparkPlan sparkPlan, JavaPairRDD hadoopRDD) { this(sparkPlan, hadoopRDD, false); @@ -79,10 +83,19 @@ public void setToCache(boolean toCache) { call(Tuple2 tuple) throws Exception { if (conf == null) { conf = new Configuration(); + conf.set("hive.execution.engine","spark"); } - - return new Tuple2(tuple._1(), - WritableUtils.clone(tuple._2(), conf)); + // In theory, + // CopyFunction MapFunction + // HADOOPRDD-----------------> RDD1-------------> RDD2..... + // these transformation are in one stage and will be executed by 1 spark task(thread), + // IOContext.get(conf).getInputPath will not be null. + // if( IOContextMap.get(conf).getInputPath()!= null) { + String inputPath = IOContextMap.get(conf).getInputPath().toString(); + Text inputPathText = new Text(inputPath); + return new Tuple2(inputPathText, + WritableUtils.clone(tuple._2(), conf)); + //} } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java index 7cd853f..7475e37 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java @@ -22,6 +22,10 @@ import java.util.Iterator; import java.util.List; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.IOContextMap; +import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.CompilationOpContext; @@ -129,7 +133,13 @@ public void processRow(Object key, Object value) throws IOException { } // reset the execContext for each new row execContext.resetRow(); - + if (HiveConf.getBoolVar(jc, HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION)) { + Path inputPath = IOContextMap.get(jc).getInputPath(); + if (inputPath == null) { + Text pathText = (Text) key; + IOContextMap.get(jc).setInputPath(new Path(pathText.toString())); + } + } try { // Since there is no concept of a group, we don't invoke // startGroup/endGroup for a mapper diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index c52692d..95be6e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -24,6 +24,9 @@ import java.util.Map; import java.util.Set; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.optimizer.SharedTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +81,7 @@ private final Map workToParentWorkTranMap; // a map from each BaseWork to its cloned JobConf private final Map workToJobConf; + private final Map tsMapInputMap; public SparkPlanGenerator( JavaSparkContext sc, @@ -94,6 +98,7 @@ public SparkPlanGenerator( this.workToParentWorkTranMap = new HashMap(); this.sparkReporter = sparkReporter; this.workToJobConf = new HashMap(); + this.tsMapInputMap = new HashMap<>(); } public SparkPlan generate(SparkWork sparkWork) throws Exception { @@ -135,7 +140,7 @@ private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, SparkTran result; if (work instanceof MapWork) { - result = generateMapInput(sparkPlan, (MapWork)work); + result = generateMapInput(sparkPlan, (MapWork) work, isCachingWork(work, sparkWork), sparkWork); sparkPlan.addTran(result); } else if (work instanceof ReduceWork) { List parentWorks = sparkWork.getParents(work); @@ -184,25 +189,75 @@ private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, } @SuppressWarnings("unchecked") - private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork) - throws Exception { + private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork, boolean cached, SparkWork sparkWork) + throws Exception { + LOG.info("MapWork :" + mapWork.getName() + " cached:" + cached); + Map sharedTableMap = new HashMap(); + sharedTableMap = sparkWork.getSharedTableMap(); + if (LOG.isDebugEnabled()) { + print(sparkWork.getSharedTableMap()); + } JobConf jobConf = cloneJobConf(mapWork); + MapInput mapInput = null; + if (jobConf.getBoolean(HiveConf.ConfVars.PLAN.HIVE_SPARK_SHARED_WORK_OPTIMIZATION.varname, false) == false) { + mapInput = createMapInput(jobConf, sparkPlan, mapWork, false); + } else { + Operator operator = mapWork.getAllRootOperators().iterator().next(); + TableScanOperator ts = (TableScanOperator) operator; + if (!sharedTableMap.containsKey(ts)) { + boolean needCache = needCache(ts, sharedTableMap); + mapInput = createMapInput(jobConf, sparkPlan, mapWork, needCache); + if (needCache) { + tsMapInputMap.put(ts, mapInput); + } + } else { + TableScanOperator retainedTs = sparkWork.getSharedTableMap().get(ts); + + if (tsMapInputMap.containsKey(retainedTs)) { + mapInput = tsMapInputMap.get(retainedTs); + } else { + //There is few possiblity to go here + LOG.info("We can not get the mapInput of retainedTs"); + mapInput = createMapInput(jobConf, sparkPlan, mapWork, false); + } + } + } + return mapInput; + } + + private void print(Map sharedTableMap) { + for (TableScanOperator key : sharedTableMap.keySet()) { + LOG.debug("key:" + key.getOperatorId() + " value:" + sharedTableMap.get(key).getOperatorId()); + } + } + + private MapInput createMapInput(JobConf jobConf, SparkPlan sparkPlan, MapWork mapWork, boolean cached) throws Exception { Class ifClass = getInputFormat(jobConf, mapWork); JavaPairRDD hadoopRDD; if (mapWork.getNumMapTasks() != null) { jobConf.setNumMapTasks(mapWork.getNumMapTasks()); hadoopRDD = sc.hadoopRDD(jobConf, ifClass, - WritableComparable.class, Writable.class, mapWork.getNumMapTasks()); + WritableComparable.class, Writable.class, mapWork.getNumMapTasks()); } else { hadoopRDD = sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class); } // Caching is disabled for MapInput due to HIVE-8920 - MapInput result = new MapInput(sparkPlan, hadoopRDD, false/*cloneToWork.containsKey(mapWork)*/); + //MapInput result = new MapInput(sparkPlan, hadoopRDD, false/*cloneToWork.containsKey(mapWork)*/); + MapInput result = new MapInput(sparkPlan, hadoopRDD, cached); return result; } + private boolean needCache(TableScanOperator ts,Map sharedTables){ + for(TableScanOperator key: sharedTables.keySet()){ + if( sharedTables.get(key).getOperatorId().equals(ts.getOperatorId())){ + return true; + } + } + return false; + } + private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolean toCache) { Preconditions.checkArgument(!edge.isShuffleNone(), "AssertionError: SHUFFLE_NONE should only be used for UnionWork."); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index c6e17b5..734d8d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -30,6 +30,7 @@ import com.google.common.base.Throwables; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.apache.hadoop.hive.ql.optimizer.SharedTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -110,7 +111,7 @@ public int execute(DriverContext driverContext) { SparkWork sparkWork = getWork(); sparkWork.setRequiredCounterPrefix(getOperatorCounters()); - + sparkWork.setSharedTableMap(SharedTable.getInstance().getSharedTables()); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); submitTime = perfLogger.getStartTime(PerfLogger.SPARK_SUBMIT_JOB); jobRef = sparkSession.submit(driverContext, sparkWork); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedResult.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedResult.java new file mode 100644 index 0000000..3cea673 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedResult.java @@ -0,0 +1,33 @@ +package org.apache.hadoop.hive.ql.optimizer; + +import org.apache.hadoop.hive.ql.exec.Operator; + +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +public class SharedResult { + public final List> retainableOps; + public final List> discardableOps; + public final Set> discardableInputOps; + final long dataSize; + final long maxDataSize; + + public SharedResult(Collection> retainableOps, Collection> discardableOps, + Set> discardableInputOps, long dataSize, long maxDataSize) { + this.retainableOps = ImmutableList.copyOf(retainableOps); + this.discardableOps = ImmutableList.copyOf(discardableOps); + this.discardableInputOps = ImmutableSet.copyOf(discardableInputOps); + this.dataSize = dataSize; + this.maxDataSize = maxDataSize; + } + + @Override + public String toString() { + return "SharedResult { " + this.retainableOps + "; " + this.discardableOps + "; " + + this.discardableInputOps + "};"; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedTable.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedTable.java new file mode 100644 index 0000000..93ed133 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedTable.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import org.apache.hadoop.hive.ql.exec.TableScanOperator; + +import java.util.HashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** + * Store TS relationship, used in SparkSharedWorkOptimizer + */ +public class SharedTable { + private static final Logger LOG = LoggerFactory.getLogger(SharedTable.class); + private static SharedTable instance = null; + private HashMap sharedTables = new HashMap<>(); + + public static SharedTable getInstance() { + if (instance == null) { + instance = new SharedTable(); + } + return instance; + } + + public HashMap getSharedTables() { + return sharedTables; + } + + public void addSharedTable(TableScanOperator op1, TableScanOperator op2) { + this.sharedTables.put(op1,op2); + } + + + public void print() { + + for(TableScanOperator key: sharedTables.keySet()){ + LOG.info("key:"+key.getOperatorId()+ " value:"+ sharedTables.get(key).getOperatorId()); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java index b0cf3bd..27bdb2a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java @@ -35,6 +35,7 @@ import java.util.TreeMap; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; @@ -49,6 +50,7 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.parse.GenTezUtils; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; @@ -564,7 +566,7 @@ private static void gatherDPPTableScanOps( LOG.debug("DPP information stored in the cache: {}", optimizerCache.tableScanToDPPSource); } - private static Multimap splitTableScanOpsByTable( + public static Multimap splitTableScanOpsByTable( ParseContext pctx) { Multimap tableNameToOps = ArrayListMultimap.create(); // Sort by operator ID so we get deterministic results @@ -578,7 +580,7 @@ private static void gatherDPPTableScanOps( return tableNameToOps; } - private static List> rankTablesByAccumulatedSize(ParseContext pctx) { + public static List> rankTablesByAccumulatedSize(ParseContext pctx) { Map tableToTotalSize = new HashMap<>(); for (Entry e : pctx.getTopOps().entrySet()) { TableScanOperator tsOp = e.getValue(); @@ -648,7 +650,7 @@ public int compare(Map.Entry, Long> o1, Map.Entry, Long> return sortedOps; } - private static boolean areMergeable(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, + public static boolean areMergeable(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, TableScanOperator tsOp1, TableScanOperator tsOp2) throws SemanticException { // First we check if the two table scan operators can actually be merged // If schemas do not match, we currently do not merge @@ -732,7 +734,7 @@ private static boolean areMergeable(ParseContext pctx, SharedWorkOptimizerCache return true; } - private static SharedResult extractSharedOptimizationInfoForRoot(ParseContext pctx, + public static SharedResult extractSharedOptimizationInfoForRoot(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, TableScanOperator retainableTsOp, TableScanOperator discardableTsOp) throws SemanticException { @@ -874,10 +876,18 @@ private static SharedResult extractSharedOptimizationInfo(ParseContext pctx, equalOp2 = currentOp2; retainableOps.add(equalOp1); discardableOps.add(equalOp2); + long noconditionalTaskSize = pctx.getConf().getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); if (equalOp1 instanceof MapJoinOperator) { MapJoinOperator mop = (MapJoinOperator) equalOp1; dataSize = StatsUtils.safeAdd(dataSize, mop.getConf().getInMemoryDataSize()); - maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + //for tez + if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + + } else if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + // for spark + maxDataSize = noconditionalTaskSize; + } } if (currentOp1.getChildOperators().size() > 1 || currentOp2.getChildOperators().size() > 1) { @@ -891,11 +901,19 @@ private static SharedResult extractSharedOptimizationInfo(ParseContext pctx, // Add the rest to the memory consumption Set> opsWork1 = findWorkOperators(optimizerCache, currentOp1); + long noconditionalTaskSize = pctx.getConf().getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); for (Operator op : opsWork1) { if (op instanceof MapJoinOperator && !retainableOps.contains(op)) { MapJoinOperator mop = (MapJoinOperator) op; dataSize = StatsUtils.safeAdd(dataSize, mop.getConf().getInMemoryDataSize()); - maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + //for tez + if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + + } else if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + // for spark + maxDataSize = noconditionalTaskSize; + } } } Set> opsWork2 = findWorkOperators(optimizerCache, currentOp2); @@ -903,7 +921,13 @@ private static SharedResult extractSharedOptimizationInfo(ParseContext pctx, if (op instanceof MapJoinOperator && !discardableOps.contains(op)) { MapJoinOperator mop = (MapJoinOperator) op; dataSize = StatsUtils.safeAdd(dataSize, mop.getConf().getInMemoryDataSize()); - maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + //for tez + if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + } else if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + // for spark + maxDataSize = noconditionalTaskSize; + } } } @@ -1084,7 +1108,7 @@ private static boolean compareOperator(ParseContext pctx, Operator op1, Opera return op1.logicalEquals(op2); } - private static boolean validPreConditions(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, + public static boolean validPreConditions(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, SharedResult sr) { // We check whether merging the works would cause the size of @@ -1313,6 +1337,9 @@ private static boolean validPreConditions(ParseContext pctx, SharedWorkOptimizer // to finish for it to execute set.addAll(findWorkOperators( optimizerCache, ((DynamicPruningEventDesc) op.getConf()).getTableScan())); + } else if(op.getConf() instanceof SparkPartitionPruningSinkDesc) { + set.addAll(findWorkOperators( + optimizerCache, ((SparkPartitionPruningSinkDesc) op.getConf()).getTableScan())); } } workOps = set; @@ -1360,7 +1387,7 @@ private static boolean validPreConditions(ParseContext pctx, SharedWorkOptimizer return found; } - private static void pushFilterToTopOfTableScan( + public static void pushFilterToTopOfTableScan( SharedWorkOptimizerCache optimizerCache, TableScanOperator tsOp) throws UDFArgumentException { ExprNodeGenericFuncDesc tableScanExprNode = tsOp.getConf().getFilterExpr(); @@ -1401,85 +1428,5 @@ private static void pushFilterToTopOfTableScan( } } - private static class SharedResult { - final List> retainableOps; - final List> discardableOps; - final Set> discardableInputOps; - final long dataSize; - final long maxDataSize; - - private SharedResult(Collection> retainableOps, Collection> discardableOps, - Set> discardableInputOps, long dataSize, long maxDataSize) { - this.retainableOps = ImmutableList.copyOf(retainableOps); - this.discardableOps = ImmutableList.copyOf(discardableOps); - this.discardableInputOps = ImmutableSet.copyOf(discardableInputOps); - this.dataSize = dataSize; - this.maxDataSize = maxDataSize; - } - - @Override - public String toString() { - return "SharedResult { " + this.retainableOps + "; " + this.discardableOps + "; " - + this.discardableInputOps + "};"; - } - } - - /** Cache to accelerate optimization */ - private static class SharedWorkOptimizerCache { - // Operators that belong to each work - final HashMultimap, Operator> operatorToWorkOperators = - HashMultimap., Operator>create(); - // Table scan operators to DPP sources - final Multimap> tableScanToDPPSource = - HashMultimap.>create(); - - // Add new operator to cache work group of existing operator (if group exists) - void putIfWorkExists(Operator opToAdd, Operator existingOp) { - List> c = ImmutableList.copyOf(operatorToWorkOperators.get(existingOp)); - if (!c.isEmpty()) { - for (Operator op : c) { - operatorToWorkOperators.get(op).add(opToAdd); - } - operatorToWorkOperators.putAll(opToAdd, c); - operatorToWorkOperators.put(opToAdd, opToAdd); - } - } - - // Remove operator - void removeOp(Operator opToRemove) { - Set> s = operatorToWorkOperators.get(opToRemove); - s.remove(opToRemove); - List> c1 = ImmutableList.copyOf(s); - if (!c1.isEmpty()) { - for (Operator op1 : c1) { - operatorToWorkOperators.remove(op1, opToRemove); // Remove operator - } - operatorToWorkOperators.removeAll(opToRemove); // Remove entry for operator - } - } - - // Remove operator and combine - void removeOpAndCombineWork(Operator opToRemove, Operator replacementOp) { - Set> s = operatorToWorkOperators.get(opToRemove); - s.remove(opToRemove); - List> c1 = ImmutableList.copyOf(s); - List> c2 = ImmutableList.copyOf(operatorToWorkOperators.get(replacementOp)); - if (!c1.isEmpty() && !c2.isEmpty()) { - for (Operator op1 : c1) { - operatorToWorkOperators.remove(op1, opToRemove); // Remove operator - operatorToWorkOperators.putAll(op1, c2); // Add ops of new collection - } - operatorToWorkOperators.removeAll(opToRemove); // Remove entry for operator - for (Operator op2 : c2) { - operatorToWorkOperators.putAll(op2, c1); // Add ops to existing collection - } - } - } - - @Override - public String toString() { - return "SharedWorkOptimizerCache { \n" + operatorToWorkOperators.toString() + "\n };"; - } - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizerCache.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizerCache.java new file mode 100644 index 0000000..569a704 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizerCache.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Multimap; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; + +import java.util.List; +import java.util.Set; + +/** + * Cache to accelerate optimization + */ +public class SharedWorkOptimizerCache { + // Operators that belong to each work + final HashMultimap, Operator> operatorToWorkOperators = + HashMultimap., Operator>create(); + // Table scan operators to DPP sources + public final Multimap> tableScanToDPPSource = + HashMultimap.>create(); + + // Add new operator to cache work group of existing operator (if group exists) + void putIfWorkExists(Operator opToAdd, Operator existingOp) { + List> c = ImmutableList.copyOf(operatorToWorkOperators.get(existingOp)); + if (!c.isEmpty()) { + for (Operator op : c) { + operatorToWorkOperators.get(op).add(opToAdd); + } + operatorToWorkOperators.putAll(opToAdd, c); + operatorToWorkOperators.put(opToAdd, opToAdd); + } + } + + // Remove operator + public void removeOp(Operator opToRemove) { + Set> s = operatorToWorkOperators.get(opToRemove); + s.remove(opToRemove); + List> c1 = ImmutableList.copyOf(s); + if (!c1.isEmpty()) { + for (Operator op1 : c1) { + operatorToWorkOperators.remove(op1, opToRemove); // Remove operator + } + operatorToWorkOperators.removeAll(opToRemove); // Remove entry for operator + } + } + + // Remove operator and combine + public void removeOpAndCombineWork(Operator opToRemove, Operator replacementOp) { + Set> s = operatorToWorkOperators.get(opToRemove); + s.remove(opToRemove); + List> c1 = ImmutableList.copyOf(s); + List> c2 = ImmutableList.copyOf(operatorToWorkOperators.get(replacementOp)); + if (!c1.isEmpty() && !c2.isEmpty()) { + for (Operator op1 : c1) { + operatorToWorkOperators.remove(op1, opToRemove); // Remove operator + operatorToWorkOperators.putAll(op1, c2); // Add ops of new collection + } + operatorToWorkOperators.removeAll(opToRemove); // Remove entry for operator + for (Operator op2 : c2) { + operatorToWorkOperators.putAll(op2, c1); // Add ops to existing collection + } + } + } + + @Override + public String toString() { + return "SharedWorkOptimizerCache { \n" + operatorToWorkOperators.toString() + "\n };"; + } +} + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java index bacc444..7b66144 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java @@ -99,6 +99,9 @@ LOG.info("Convert to non-bucketed map join"); MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos); + //Refer tez code ConvertJoinMapJoin.estimateNumBuckets, if numBuckets <0, then defaultNumBucket is 1 + int defaultNumBucket = 1; + mapJoinOp.getConf().setInMemoryDataSize(mapJoinInfo[2] / defaultNumBucket); // For native vectorized map join, we require the key SerDe to be BinarySortableSerDe // Note: the MJ may not really get natively-vectorized later, // but changing SerDe won't hurt correctness @@ -115,6 +118,8 @@ LOG.info("Converted to map join with " + numBuckets + " buckets"); bucketColNames = joinOp.getOpTraits().getBucketColNames(); mapJoinInfo[2] /= numBuckets; + //Refer tez code in ConvertJoinMapJoin.getMapJoinConversionPos() + mapJoinOp.getConf().setInMemoryDataSize(mapJoinInfo[2]); } else { LOG.info("Can not convert to bucketed map join"); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSharedWorkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSharedWorkOptimizer.java new file mode 100644 index 0000000..40c4420 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSharedWorkOptimizer.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.spark; + +import com.google.common.collect.Lists; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.Multimap; +import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.optimizer.SharedResult; +import org.apache.hadoop.hive.ql.optimizer.SharedTable; +import org.apache.hadoop.hive.ql.optimizer.SharedWorkOptimizer; +import org.apache.hadoop.hive.ql.optimizer.SharedWorkOptimizerCache; +import org.apache.hadoop.hive.ql.optimizer.Transform; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo; +import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; +import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ArrayListMultimap; +/** + * Shared computation optimizer. + * + *

Originally, this rule would find scan operators over the same table + * in the query plan and merge them if they met some preconditions. + * + * TS TS TS + * | | -> / \ + * Op Op Op Op + * + *

A limitation in the current implementation is that the optimizer does not + * go beyond a work boundary. + * + *

The optimization works with the Spark execution engine. + **/ + +public class SparkSharedWorkOptimizer extends Transform { + + private final static Logger LOG = LoggerFactory.getLogger(SparkSharedWorkOptimizer.class); + + private HashMap replaceFilterTSMap = new HashMap(); + @Override + public ParseContext transform(ParseContext pctx) throws SemanticException { + + final Map topOps = pctx.getTopOps(); + if (topOps.size() < 2) { + // Nothing to do, bail out + return pctx; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Before SharedWorkOptimizer:\n" + Operator.toString(pctx.getTopOps().values())); + } + // Cache to use during optimization + SharedWorkOptimizerCache optimizerCache = new SharedWorkOptimizerCache(); + + // Gather information about the DPP table scans and store it in the cache + gatherDPPTableScanOps(pctx, optimizerCache); + + // Map of dbName.TblName -> TSOperator + Multimap tableNameToOps = SharedWorkOptimizer.splitTableScanOpsByTable(pctx); + + // We enforce a certain order when we do the reutilization. + // In particular, we use size of table x number of reads to + // rank the tables. + List> sortedTables = SharedWorkOptimizer.rankTablesByAccumulatedSize(pctx); + LOG.debug("Sorted tables by size: {}", sortedTables); + + // Execute optimization + Multimap existingOps = ArrayListMultimap.create(); + Set> removedOps = new HashSet<>(); + for (Map.Entry tablePair : sortedTables) { + String tableName = tablePair.getKey(); + for (TableScanOperator discardableTsOp : tableNameToOps.get(tableName)) { + if (removedOps.contains(discardableTsOp)) { + LOG.debug("Skip {} as it has been already removed", discardableTsOp); + continue; + } + Collection prevTsOps = existingOps.get(tableName); + for (TableScanOperator retainableTsOp : prevTsOps) { + if (removedOps.contains(retainableTsOp)) { + LOG.debug("Skip {} as it has been already removed", retainableTsOp); + continue; + } + + // First we quickly check if the two table scan operators can actually be merged + boolean mergeable = SharedWorkOptimizer.areMergeable(pctx, optimizerCache, retainableTsOp, discardableTsOp); + if (!mergeable) { + // Skip + LOG.debug("{} and {} cannot be merged", retainableTsOp, discardableTsOp); + continue; + } + + // Secondly, we extract information about the part of the tree that can be merged + // as well as some structural information (memory consumption) that needs to be + // used to determined whether the merge can happen + SharedResult sr = SharedWorkOptimizer.extractSharedOptimizationInfoForRoot( + pctx, optimizerCache, retainableTsOp, discardableTsOp); + + // It seems these two operators can be merged. + // Check that plan meets some preconditions before doing it. + // In particular, in the presence of map joins in the upstream plan: + // - we cannot exceed the noconditional task size, and + // - if we already merged the big table, we cannot merge the broadcast + // tables. +// if (!SharedWorkOptimizer.validPreConditions(pctx, optimizerCache, sr)) { +// // Skip +// LOG.debug("{} and {} do not meet preconditions", retainableTsOp, discardableTsOp); +// continue; +// } + + // We can merge + if (sr.retainableOps.size() > 1) { + //For hive on tez, this feature may will merge two branches if more than 1 operator(TS) are same in the two + //branches. but for hive on spark, currently this is no need( not support). + +// // More than TS operator +// Operator lastRetainableOp = sr.retainableOps.get(sr.retainableOps.size() - 1); +// Operator lastDiscardableOp = sr.discardableOps.get(sr.discardableOps.size() - 1); +// if (lastDiscardableOp.getNumChild() != 0) { +// List> allChildren = +// Lists.newArrayList(lastDiscardableOp.getChildOperators()); +// for (Operator op : allChildren) { +// lastDiscardableOp.getChildOperators().remove(op); +// op.replaceParent(lastDiscardableOp, lastRetainableOp); +// lastRetainableOp.getChildOperators().add(op); +// } +// } + +// LOG.debug("Merging subtree starting at {} into subtree starting at {}", discardableTsOp, retainableTsOp); + } else { + // Only TS operator + ExprNodeGenericFuncDesc exprNode = null; + if (retainableTsOp.getConf().getFilterExpr() != null) { + // Push filter on top of children + SharedWorkOptimizer.pushFilterToTopOfTableScan(optimizerCache, retainableTsOp); + // Clone to push to table scan + exprNode = (ExprNodeGenericFuncDesc) retainableTsOp.getConf().getFilterExpr(); + } + if (discardableTsOp.getConf().getFilterExpr() != null) { + // Push filter on top + SharedWorkOptimizer.pushFilterToTopOfTableScan(optimizerCache, discardableTsOp); + ExprNodeGenericFuncDesc tsExprNode = discardableTsOp.getConf().getFilterExpr(); + if (exprNode != null && !exprNode.isSame(tsExprNode)) { + // We merge filters from previous scan by ORing with filters from current scan + if (exprNode.getGenericUDF() instanceof GenericUDFOPOr) { + List newChildren = new ArrayList<>(exprNode.getChildren().size() + 1); + for (ExprNodeDesc childExprNode : exprNode.getChildren()) { + if (childExprNode.isSame(tsExprNode)) { + // We do not need to do anything, it is in the OR expression + break; + } + newChildren.add(childExprNode); + } + if (exprNode.getChildren().size() == newChildren.size()) { + newChildren.add(tsExprNode); + exprNode = ExprNodeGenericFuncDesc.newInstance( + new GenericUDFOPOr(), + newChildren); + } + } else { + exprNode = ExprNodeGenericFuncDesc.newInstance( + new GenericUDFOPOr(), + Arrays.asList(exprNode, tsExprNode)); + } + } + } + // Replace filter + //TODO: understand why filters are not combined when there is more than 1 operator(TS) are same + //in the two branch + retainableTsOp.getConf().setFilterExpr(exprNode); + replaceFilterTSMap.put(discardableTsOp, retainableTsOp); + LOG.debug("Merging {} into {}", discardableTsOp, retainableTsOp); + } + + LOG.info("SharedTable.getInstance().addSharedTable< "+discardableTsOp.getOperatorId()+ " ,"+retainableTsOp.getOperatorId()+" >"); + SharedTable.getInstance().addSharedTable(discardableTsOp,retainableTsOp); + // First we remove the input operators of the expression that + // we are going to eliminate + for (Operator op : sr.discardableInputOps) { + //TODO Verify we need optimizerCache.removeOp(op) + optimizerCache.removeOp(op); + removedOps.add(op); + // Remove DPP predicates + if (op instanceof ReduceSinkOperator) { + SemiJoinBranchInfo sjbi = pctx.getRsToSemiJoinBranchInfo().get(op); + if (sjbi != null && !sr.discardableOps.contains(sjbi.getTsOp()) && + !sr.discardableInputOps.contains(sjbi.getTsOp())) { + //TODO To find similar code in Spark +// GenTezUtils.removeSemiJoinOperator( +// pctx, (ReduceSinkOperator) op, sjbi.getTsOp()); + } + } else if (op instanceof AppMasterEventOperator) { + DynamicPruningEventDesc dped = (DynamicPruningEventDesc) op.getConf(); + if (!sr.discardableOps.contains(dped.getTableScan()) && + !sr.discardableInputOps.contains(dped.getTableScan())) { + //TODO To find similar code in Spark +// GenTezUtils.removeSemiJoinOperator( +// pctx, (AppMasterEventOperator) op, dped.getTableScan()); + } + } + LOG.debug("Input operator removed: {}", op); + } + + removedOps.add(discardableTsOp); + // Finally we remove the expression from the tree + for (Operator op : sr.discardableOps) { + //TODO Verify we need optimizerCache.removeOp(op) + optimizerCache.removeOp(op); + removedOps.add(op); + if (sr.discardableOps.size() == 1) { + // If there is a single discardable operator, it is a TableScanOperator + // and it means that we have merged filter expressions for it. Thus, we + // might need to remove DPP predicates from the retainable TableScanOperator + Collection> c = + optimizerCache.tableScanToDPPSource.get((TableScanOperator) op); + for (Operator dppSource : c) { + if (dppSource instanceof ReduceSinkOperator) { + //TODO To find similar code in Spark +// GenTezUtils.removeSemiJoinOperator(pctx, +// (ReduceSinkOperator) dppSource, +// (TableScanOperator) sr.retainableOps.get(0)); + } else if (dppSource instanceof AppMasterEventOperator) { + //TODO To find similar code in Spark +// GenTezUtils.removeSemiJoinOperator(pctx, +// (AppMasterEventOperator) dppSource, +// (TableScanOperator) sr.retainableOps.get(0)); + } + } + } + LOG.debug("Operator removed: {}", op); + } + + break; + } + + if (removedOps.contains(discardableTsOp)) { + // This operator has been removed, remove it from the list of existing operators + existingOps.remove(tableName, discardableTsOp); + } else { + // This operator has not been removed, include it in the list of existing operators + existingOps.put(tableName, discardableTsOp); + } + } + } + + // Remove unused table scan operators + Iterator> it = topOps.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry e = it.next(); + if (e.getValue().getNumChild() == 0) { + it.remove(); + } + } + + //combine all conditions of TS to 1 condition + for(TableScanOperator discardableTsOp:replaceFilterTSMap.keySet()){ + TableScanOperator retainedTsOp = replaceFilterTSMap.get(discardableTsOp); + if( retainedTsOp.getConf().getFilterExpr()!= null) { + discardableTsOp.getConf().setFilterExpr( + retainedTsOp.getConf().getFilterExpr()); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("After SharedWorkOptimizer:\n" + Operator.toString(pctx.getTopOps().values())); + } + + return pctx; + } + + private static void gatherDPPTableScanOps( + ParseContext pctx, SharedWorkOptimizerCache optimizerCache) throws SemanticException { + // Find TS operators with partition pruning enabled in plan + // because these TS may potentially read different data for + // different pipeline. + // These can be: + // 1) TS with DPP. + // 2) TS with semijoin DPP. + Map topOps = pctx.getTopOps(); + Collection> tableScanOps = + Lists.>newArrayList(topOps.values()); + Set s = + OperatorUtils.findOperators(tableScanOps, SparkPartitionPruningSinkOperator.class); + for (SparkPartitionPruningSinkOperator a : s) { + if (a.getConf() instanceof SparkPartitionPruningSinkDesc) { + SparkPartitionPruningSinkDesc dped = (SparkPartitionPruningSinkDesc) a.getConf(); + optimizerCache.tableScanToDPPSource.put(dped.getTableScan(), a); + } + } + for (Map.Entry e + : pctx.getRsToSemiJoinBranchInfo().entrySet()) { + optimizerCache.tableScanToDPPSource.put(e.getValue().getTsOp(), e.getKey()); + } + LOG.debug("DPP information stored in the cache: {}", optimizerCache.tableScanToDPPSource); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 5220281..a3af1c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -81,6 +81,7 @@ import org.apache.hadoop.hive.ql.optimizer.spark.SparkJoinOptimizer; import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.optimizer.spark.SparkReduceSinkMapJoinProc; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkSharedWorkOptimizer; import org.apache.hadoop.hive.ql.optimizer.spark.SparkSkewJoinResolver; import org.apache.hadoop.hive.ql.optimizer.spark.SplitSparkWorkResolver; import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics; @@ -143,6 +144,10 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, new ConstantPropagate(ConstantPropagateProcCtx.ConstantPropagateOption.SHORTCUT).transform(pCtx); } + if (procCtx.getParseContext().getConf().getBoolVar(HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION)) { + new SparkSharedWorkOptimizer().transform(procCtx.getParseContext()); + } + PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java index 3ed5cb2..e9fe807 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java @@ -34,6 +34,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -67,6 +68,8 @@ private Map cloneToWork; + private Map sharedTableMap; + public SparkWork(String queryId) { this.queryId = queryId; this.dagName = queryId + ":" + counter.getAndIncrement(); @@ -434,4 +437,12 @@ public String toString() { public void setCloneToWork(Map cloneToWork) { this.cloneToWork = cloneToWork; } + + public Map getSharedTableMap() { + return sharedTableMap; + } + + public void setSharedTableMap(Map sharedTableMap) { + this.sharedTableMap = sharedTableMap; + } }