From 836ed204fefeef9fcf257bd21bdd7bca1908089f Mon Sep 17 00:00:00 2001 From: kellyzly Date: Mon, 25 Dec 2017 16:43:46 +0800 Subject: [PATCH 01/11] HIVE-17486.4 --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 3 + .../test/resources/testconfiguration.properties | 5 +- .../hadoop/hive/ql/exec/mr/ExecMapperContext.java | 16 + .../hive/ql/exec/spark/SparkPlanGenerator.java | 50 +++- .../hadoop/hive/ql/optimizer/SharedResult.java | 27 ++ .../hadoop/hive/ql/optimizer/SharedTable.java | 48 +++ .../hive/ql/optimizer/SharedWorkOptimizer.java | 155 ++++------ .../ql/optimizer/SharedWorkOptimizerCache.java | 82 +++++ .../ql/optimizer/spark/SparkMapJoinOptimizer.java | 6 + .../optimizer/spark/SparkSharedWorkOptimizer.java | 330 +++++++++++++++++++++ .../hadoop/hive/ql/parse/spark/SparkCompiler.java | 30 +- .../hive/ql/parse/spark/SparkRuleDispatcher.java | 94 ++++++ 12 files changed, 743 insertions(+), 103 deletions(-) create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedResult.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedTable.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizerCache.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSharedWorkOptimizer.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkRuleDispatcher.java diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 711dfbd..8fae810 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1695,6 +1695,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_SHARED_WORK_OPTIMIZATION("hive.optimize.shared.work", true, "Whether to enable shared work optimizer. The optimizer finds scan operator over the same table\n" + "and follow-up operators in the query plan and merges them if they meet some preconditions. Tez only."), + HIVE_SPARK_SHARED_WORK_OPTIMIZATION("hive.spark.optimize.shared.work", false, + "Whether to enable shared work optimizer. The optimizer finds scan operator over the same table\n" + + "and follow-up operators in the query plan and merges them if they meet some preconditions."), HIVE_COMBINE_EQUIVALENT_WORK_OPTIMIZATION("hive.combine.equivalent.work.optimization", true, "Whether to " + "combine equivalent work objects during physical optimization.\n This optimization looks for equivalent " + "work objects and combines them if they meet certain preconditions. Spark only."), diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 2bf64dc..11696a0 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1516,7 +1516,7 @@ miniSparkOnYarn.only.query.files=spark_combine_equivalent_work.q,\ spark_use_ts_stats_for_mapjoin.q,\ spark_use_op_stats.q,\ spark_explain_groupbyshuffle.q,\ - spark_opt_shuffle_serde.q + spark_opt_shuffle_serde.q, miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\ bucket4.q,\ @@ -1583,7 +1583,8 @@ miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\ # ql_rewrite_gbtoidx_cbo_1.q,\ # smb_mapjoin_8.q,\ -localSpark.only.query.files=spark_local_queries.q +localSpark.only.query.files=spark_local_queries.q,\ + spark_optimize_shared_work.q spark.query.negative.files=groupby2_map_skew_multi_distinct.q,\ groupby2_multi_distinct.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java index 8f397fa..ad8a1be 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.exec.mr; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.FetchOperator; @@ -28,6 +30,7 @@ public class ExecMapperContext { + private static final Logger LOG = LoggerFactory.getLogger(ExecMapperContext.class); // lastInputPath should be changed by the root of the operator tree ExecMapper.map() // but kept unchanged throughout the operator tree for one row private Path lastInputPath = null; @@ -107,10 +110,22 @@ public void setLastInputPath(Path lastInputPath) { public Path getCurrentInputPath() { currentInputPath = this.ioCxt.getInputPath(); + for (StackTraceElement ste : Thread.currentThread().getStackTrace()) { + LOG.info(ste.toString()); + } + + LOG.info("currentInputPath is "+currentInputPath); + return currentInputPath; } public void setCurrentInputPath(Path currentInputPath) { + for (StackTraceElement ste : Thread.currentThread().getStackTrace()) { + LOG.info(ste.toString()); + } + + LOG.info("setCurrentInputPath is "+currentInputPath); + this.currentInputPath = currentInputPath; } @@ -148,4 +163,5 @@ public void setFetchOperators(Map fetchOperators) { public IOContext getIoCxt() { return ioCxt; } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 079ec42..bc1148e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.Set; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.optimizer.SharedTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +80,7 @@ private final Map workToParentWorkTranMap; // a map from each BaseWork to its cloned JobConf private final Map workToJobConf; + private final Map tsMapInputMap; public SparkPlanGenerator( JavaSparkContext sc, @@ -94,6 +97,7 @@ public SparkPlanGenerator( this.workToParentWorkTranMap = new HashMap(); this.sparkReporter = sparkReporter; this.workToJobConf = new HashMap(); + this.tsMapInputMap = new HashMap<>(); } public SparkPlan generate(SparkWork sparkWork) throws Exception { @@ -135,7 +139,7 @@ private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, SparkTran result; if (work instanceof MapWork) { - result = generateMapInput(sparkPlan, (MapWork)work); + result = generateMapInput(sparkPlan, (MapWork)work, isCachingWork(work, sparkWork)); sparkPlan.addTran(result); } else if (work instanceof ReduceWork) { List parentWorks = sparkWork.getParents(work); @@ -184,25 +188,61 @@ private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, } @SuppressWarnings("unchecked") - private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork) - throws Exception { + private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork, boolean cached) + throws Exception { + LOG.info("MapWork :" + mapWork.getName() + " cached:" + cached); JobConf jobConf = cloneJobConf(mapWork); + MapInput mapInput = null; + if (jobConf.getBoolean(HiveConf.ConfVars.PLAN.HIVE_SPARK_SHARED_WORK_OPTIMIZATION.varname, false) == false) { + mapInput = createMapInput(sparkPlan, mapWork, false); + } else { + Operator operator = mapWork.getAllRootOperators().iterator().next(); + TableScanOperator ts = (TableScanOperator) operator; + if (!SharedTable.getInstance().getSharedTables().containsKey(ts)) { + boolean needCache = needCache(ts); + + mapInput = createMapInput(sparkPlan, mapWork, needCache); + if (needCache) { + tsMapInputMap.put(ts, mapInput); + } + } else { + TableScanOperator retainedTs = SharedTable.getInstance().getSharedTables().get(ts); + if (tsMapInputMap.containsKey(retainedTs)) { + mapInput = tsMapInputMap.get(retainedTs); + } + } + } + return mapInput; + } + + private MapInput createMapInput(SparkPlan sparkPlan, MapWork mapWork, boolean cached) throws Exception { Class ifClass = getInputFormat(jobConf, mapWork); JavaPairRDD hadoopRDD; if (mapWork.getNumMapTasks() != null) { jobConf.setNumMapTasks(mapWork.getNumMapTasks()); hadoopRDD = sc.hadoopRDD(jobConf, ifClass, - WritableComparable.class, Writable.class, mapWork.getNumMapTasks()); + WritableComparable.class, Writable.class, mapWork.getNumMapTasks()); } else { hadoopRDD = sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class); } // Caching is disabled for MapInput due to HIVE-8920 - MapInput result = new MapInput(sparkPlan, hadoopRDD, false/*cloneToWork.containsKey(mapWork)*/); + //MapInput result = new MapInput(sparkPlan, hadoopRDD, false/*cloneToWork.containsKey(mapWork)*/); + MapInput result = new MapInput(sparkPlan, hadoopRDD, cached); return result; } + private boolean needCache(TableScanOperator ts) { + Map sharedTables = SharedTable.getInstance().getSharedTables(); + for(TableScanOperator key: sharedTables.keySet()){ + if( sharedTables.get(key).getOperatorId().equals(ts.getOperatorId())){ + return true; + } + } + return false; + } + private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolean toCache) { Preconditions.checkArgument(!edge.isShuffleNone(), "AssertionError: SHUFFLE_NONE should only be used for UnionWork."); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedResult.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedResult.java new file mode 100644 index 0000000..8a90256 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedResult.java @@ -0,0 +1,27 @@ +package org.apache.hadoop.hive.ql.optimizer; + +import org.apache.hadoop.hive.ql.exec.Operator; + +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +public class SharedResult { + public final List> retainableOps; + public final List> discardableOps; + public final Set> discardableInputOps; + public final long dataSize; + public final long maxDataSize; + + public SharedResult(Collection> retainableOps, Collection> discardableOps, + Set> discardableInputOps, long dataSize, long maxDataSize) { + this.retainableOps = ImmutableList.copyOf(retainableOps); + this.discardableOps = ImmutableList.copyOf(discardableOps); + this.discardableInputOps = ImmutableSet.copyOf(discardableInputOps); + this.dataSize = dataSize; + this.maxDataSize = maxDataSize; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedTable.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedTable.java new file mode 100644 index 0000000..8dd4b8e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedTable.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import org.apache.hadoop.hive.ql.exec.TableScanOperator; + +import java.util.HashMap; + +/** + * Store TS relationship, used in SparkSharedWorkOptimizer + */ +public class SharedTable { + private static SharedTable instance = null; + private HashMap sharedTables = new HashMap<>(); + + public static SharedTable getInstance() { + if (instance == null) { + instance = new SharedTable(); + } + return instance; + } + + public HashMap getSharedTables() { + return sharedTables; + } + + public void addSharedTable(TableScanOperator op1, TableScanOperator op2) { + this.sharedTables.put(op1,op2); + } + + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java index d4ddb75..f3df8e2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java @@ -36,6 +36,7 @@ import java.util.TreeMap; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; @@ -49,6 +50,7 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.parse.GenTezUtils; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; @@ -263,13 +265,26 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { GenTezUtils.removeSemiJoinOperator( pctx, (ReduceSinkOperator) op, sjbi.getTsOp()); } - } else if (op instanceof AppMasterEventOperator) { - DynamicPruningEventDesc dped = (DynamicPruningEventDesc) op.getConf(); - if (!sr.discardableOps.contains(dped.getTableScan()) && - !sr.discardableInputOps.contains(dped.getTableScan())) { - GenTezUtils.removeSemiJoinOperator( - pctx, (AppMasterEventOperator) op, dped.getTableScan()); + } else if (op.getConf() instanceof DynamicPruningEventDesc || op.getConf() instanceof SparkPartitionPruningSinkDesc) { + if (HiveConf.getVar(pctx.getConf(), + HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + DynamicPruningEventDesc dped =(DynamicPruningEventDesc) op.getConf(); + if (!sr.discardableOps.contains(dped.getTableScan()) && + !sr.discardableInputOps.contains(dped.getTableScan())) { + GenTezUtils.removeSemiJoinOperator( + pctx, (AppMasterEventOperator) op, dped.getTableScan()); + } + } else if (HiveConf.getVar(pctx.getConf(), + HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + SparkPartitionPruningSinkDesc dped = (SparkPartitionPruningSinkDesc) op.getConf(); + if (!sr.discardableOps.contains(dped.getTableScan()) && + !sr.discardableInputOps.contains(dped.getTableScan())) { + // current in Hive on Spark,there is no similar feature like "hive.tez.dynamic.semijoin.reduction(HIVE-16862) + /*GenTezUtils.removeSemiJoinOperator( + pctx, (AppMasterEventOperator) op, dped.getTableScan());*/ + } } + } LOG.debug("Input operator removed: {}", op); } @@ -361,7 +376,7 @@ private static void gatherDPPTableScanOps( LOG.debug("DPP information stored in the cache: {}", optimizerCache.tableScanToDPPSource); } - private static Multimap splitTableScanOpsByTable( + public static Multimap splitTableScanOpsByTable( ParseContext pctx) { Multimap tableNameToOps = ArrayListMultimap.create(); // Sort by operator ID so we get deterministic results @@ -375,7 +390,7 @@ private static void gatherDPPTableScanOps( return tableNameToOps; } - private static List> rankTablesByAccumulatedSize(ParseContext pctx) { + public static List> rankTablesByAccumulatedSize(ParseContext pctx) { Map tableToTotalSize = new HashMap<>(); for (Entry e : pctx.getTopOps().entrySet()) { TableScanOperator tsOp = e.getValue(); @@ -402,7 +417,7 @@ public int compare(Map.Entry o1, Map.Entry o2) { return sortedTables; } - private static boolean areMergeable(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, + public static boolean areMergeable(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, TableScanOperator tsOp1, TableScanOperator tsOp2) throws SemanticException { // First we check if the two table scan operators can actually be merged // If schemas do not match, we currently do not merge @@ -486,11 +501,11 @@ private static boolean areMergeable(ParseContext pctx, SharedWorkOptimizerCache return true; } - private static SharedResult extractSharedOptimizationInfo(ParseContext pctx, + public static SharedResult extractSharedOptimizationInfo(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, TableScanOperator retainableTsOp, TableScanOperator discardableTsOp) throws SemanticException { - Set> retainableOps = new LinkedHashSet<>(); + Set> retainableOps = new LinkedHashSet<>(); Set> discardableOps = new LinkedHashSet<>(); Set> discardableInputOps = new HashSet<>(); long dataSize = 0l; @@ -591,10 +606,18 @@ private static SharedResult extractSharedOptimizationInfo(ParseContext pctx, equalOp2 = currentOp2; retainableOps.add(equalOp1); discardableOps.add(equalOp2); + long noconditionalTaskSize = pctx.getConf().getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); if (equalOp1 instanceof MapJoinOperator) { MapJoinOperator mop = (MapJoinOperator) equalOp1; dataSize = StatsUtils.safeAdd(dataSize, mop.getConf().getInMemoryDataSize()); - maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + //for tez + if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + + } else if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + // for spark + maxDataSize = noconditionalTaskSize; + } } if (currentOp1.getChildOperators().size() > 1 || currentOp2.getChildOperators().size() > 1) { @@ -608,11 +631,19 @@ private static SharedResult extractSharedOptimizationInfo(ParseContext pctx, // Add the rest to the memory consumption Set> opsWork1 = findWorkOperators(optimizerCache, currentOp1); + long noconditionalTaskSize = pctx.getConf().getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); for (Operator op : opsWork1) { if (op instanceof MapJoinOperator && !retainableOps.contains(op)) { MapJoinOperator mop = (MapJoinOperator) op; dataSize = StatsUtils.safeAdd(dataSize, mop.getConf().getInMemoryDataSize()); - maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + //for tez + if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + + } else if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + // for spark + maxDataSize = noconditionalTaskSize; + } } } Set> opsWork2 = findWorkOperators(optimizerCache, currentOp2); @@ -620,7 +651,13 @@ private static SharedResult extractSharedOptimizationInfo(ParseContext pctx, if (op instanceof MapJoinOperator && !discardableOps.contains(op)) { MapJoinOperator mop = (MapJoinOperator) op; dataSize = StatsUtils.safeAdd(dataSize, mop.getConf().getInMemoryDataSize()); - maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + //for tez + if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + } else if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + // for spark + maxDataSize = noconditionalTaskSize; + } } } @@ -797,7 +834,7 @@ private static boolean compareOperator(ParseContext pctx, Operator op1, Opera return op1.logicalEquals(op2); } - private static boolean validPreConditions(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, + public static boolean validPreConditions(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, SharedResult sr) { // We check whether merging the works would cause the size of @@ -1012,11 +1049,20 @@ private static boolean validPreConditions(ParseContext pctx, SharedWorkOptimizer if (sjbi != null) { set.addAll(findWorkOperators(optimizerCache, sjbi.getTsOp())); } - } else if(op.getConf() instanceof DynamicPruningEventDesc) { + } else if(op.getConf() instanceof DynamicPruningEventDesc || op.getConf() instanceof SparkPartitionPruningSinkDesc) { // DPP work is considered a descendant because work needs // to finish for it to execute - set.addAll(findWorkOperators( - optimizerCache, ((DynamicPruningEventDesc) op.getConf()).getTableScan())); + if (HiveConf.getVar(pctx.getConf(), + HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + //for tez + set.addAll(findWorkOperators( + optimizerCache, ((DynamicPruningEventDesc) op.getConf()).getTableScan())); + }else if (HiveConf.getVar(pctx.getConf(), + HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + //for spark + set.addAll(findWorkOperators( + optimizerCache, ((SparkPartitionPruningSinkDesc) op.getConf()).getTableScan())); + } } } workOps = set; @@ -1064,7 +1110,7 @@ private static boolean validPreConditions(ParseContext pctx, SharedWorkOptimizer return found; } - private static void pushFilterToTopOfTableScan( + public static void pushFilterToTopOfTableScan( SharedWorkOptimizerCache optimizerCache, TableScanOperator tsOp) throws UDFArgumentException { ExprNodeGenericFuncDesc tableScanExprNode = tsOp.getConf().getFilterExpr(); @@ -1104,75 +1150,4 @@ private static void pushFilterToTopOfTableScan( } } } - - private static class SharedResult { - final List> retainableOps; - final List> discardableOps; - final Set> discardableInputOps; - final long dataSize; - final long maxDataSize; - - private SharedResult(Collection> retainableOps, Collection> discardableOps, - Set> discardableInputOps, long dataSize, long maxDataSize) { - this.retainableOps = ImmutableList.copyOf(retainableOps); - this.discardableOps = ImmutableList.copyOf(discardableOps); - this.discardableInputOps = ImmutableSet.copyOf(discardableInputOps); - this.dataSize = dataSize; - this.maxDataSize = maxDataSize; - } - } - - /** Cache to accelerate optimization */ - private static class SharedWorkOptimizerCache { - // Operators that belong to each work - final HashMultimap, Operator> operatorToWorkOperators = - HashMultimap., Operator>create(); - // Table scan operators to DPP sources - final Multimap> tableScanToDPPSource = - HashMultimap.>create(); - - // Add new operator to cache work group of existing operator (if group exists) - void putIfWorkExists(Operator opToAdd, Operator existingOp) { - List> c = ImmutableList.copyOf(operatorToWorkOperators.get(existingOp)); - if (!c.isEmpty()) { - for (Operator op : c) { - operatorToWorkOperators.get(op).add(opToAdd); - } - operatorToWorkOperators.putAll(opToAdd, c); - operatorToWorkOperators.put(opToAdd, opToAdd); - } - } - - // Remove operator - void removeOp(Operator opToRemove) { - Set> s = operatorToWorkOperators.get(opToRemove); - s.remove(opToRemove); - List> c1 = ImmutableList.copyOf(s); - if (!c1.isEmpty()) { - for (Operator op1 : c1) { - operatorToWorkOperators.remove(op1, opToRemove); // Remove operator - } - operatorToWorkOperators.removeAll(opToRemove); // Remove entry for operator - } - } - - // Remove operator and combine - void removeOpAndCombineWork(Operator opToRemove, Operator replacementOp) { - Set> s = operatorToWorkOperators.get(opToRemove); - s.remove(opToRemove); - List> c1 = ImmutableList.copyOf(s); - List> c2 = ImmutableList.copyOf(operatorToWorkOperators.get(replacementOp)); - if (!c1.isEmpty() && !c2.isEmpty()) { - for (Operator op1 : c1) { - operatorToWorkOperators.remove(op1, opToRemove); // Remove operator - operatorToWorkOperators.putAll(op1, c2); // Add ops of new collection - } - operatorToWorkOperators.removeAll(opToRemove); // Remove entry for operator - for (Operator op2 : c2) { - operatorToWorkOperators.putAll(op2, c1); // Add ops to existing collection - } - } - } - } - } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizerCache.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizerCache.java new file mode 100644 index 0000000..0b72a07 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizerCache.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Multimap; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; + +import java.util.List; +import java.util.Set; + +/** + * Cache to accelerate optimization + */ +public class SharedWorkOptimizerCache { + // Operators that belong to each work + final HashMultimap, Operator> operatorToWorkOperators = + HashMultimap., Operator>create(); + // Table scan operators to DPP sources + public final Multimap> tableScanToDPPSource = + HashMultimap.>create(); + + // Add new operator to cache work group of existing operator (if group exists) + void putIfWorkExists(Operator opToAdd, Operator existingOp) { + List> c = ImmutableList.copyOf(operatorToWorkOperators.get(existingOp)); + if (!c.isEmpty()) { + for (Operator op : c) { + operatorToWorkOperators.get(op).add(opToAdd); + } + operatorToWorkOperators.putAll(opToAdd, c); + operatorToWorkOperators.put(opToAdd, opToAdd); + } + } + + // Remove operator + public void removeOp(Operator opToRemove) { + Set> s = operatorToWorkOperators.get(opToRemove); + s.remove(opToRemove); + List> c1 = ImmutableList.copyOf(s); + if (!c1.isEmpty()) { + for (Operator op1 : c1) { + operatorToWorkOperators.remove(op1, opToRemove); // Remove operator + } + operatorToWorkOperators.removeAll(opToRemove); // Remove entry for operator + } + } + + // Remove operator and combine + public void removeOpAndCombineWork(Operator opToRemove, Operator replacementOp) { + Set> s = operatorToWorkOperators.get(opToRemove); + s.remove(opToRemove); + List> c1 = ImmutableList.copyOf(s); + List> c2 = ImmutableList.copyOf(operatorToWorkOperators.get(replacementOp)); + if (!c1.isEmpty() && !c2.isEmpty()) { + for (Operator op1 : c1) { + operatorToWorkOperators.remove(op1, opToRemove); // Remove operator + operatorToWorkOperators.putAll(op1, c2); // Add ops of new collection + } + operatorToWorkOperators.removeAll(opToRemove); // Remove entry for operator + for (Operator op2 : c2) { + operatorToWorkOperators.putAll(op2, c1); // Add ops to existing collection + } + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java index 8425911..51011b6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java @@ -99,6 +99,9 @@ LOG.info("Convert to non-bucketed map join"); MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos); + //Refer tez code ConvertJoinMapJoin.estimateNumBuckets, if numBuckets <0, then defaultNumBucket is 1 + int defaultNumBucket = 1; + mapJoinOp.getConf().setInMemoryDataSize(mapJoinInfo[2] / defaultNumBucket); // For native vectorized map join, we require the key SerDe to be BinarySortableSerDe // Note: the MJ may not really get natively-vectorized later, // but changing SerDe won't hurt correctness @@ -115,6 +118,9 @@ LOG.info("Converted to map join with " + numBuckets + " buckets"); bucketColNames = joinOp.getOpTraits().getBucketColNames(); mapJoinInfo[2] /= numBuckets; + //Refer tez code in ConvertJoinMapJoin.getMapJoinConversionPos() + mapJoinOp.getConf().setInMemoryDataSize(mapJoinInfo[2]); + } else { LOG.info("Can not convert to bucketed map join"); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSharedWorkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSharedWorkOptimizer.java new file mode 100644 index 0000000..621db27 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSharedWorkOptimizer.java @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.spark; + +import com.google.common.collect.Lists; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.Multimap; +import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.optimizer.SharedResult; +import org.apache.hadoop.hive.ql.optimizer.SharedTable; +import org.apache.hadoop.hive.ql.optimizer.SharedWorkOptimizer; +import org.apache.hadoop.hive.ql.optimizer.SharedWorkOptimizerCache; +import org.apache.hadoop.hive.ql.optimizer.Transform; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo; +import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; +import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ArrayListMultimap; +/** + * Shared computation optimizer. + * + *

Originally, this rule would find scan operators over the same table + * in the query plan and merge them if they met some preconditions. + * + * TS TS TS + * | | -> / \ + * Op Op Op Op + * + *

A limitation in the current implementation is that the optimizer does not + * go beyond a work boundary. + * + *

The optimization works with the Spark execution engine. + **/ + +public class SparkSharedWorkOptimizer extends Transform { + + private final static Logger LOG = LoggerFactory.getLogger(SparkSharedWorkOptimizer.class); + + private HashMap replaceFilterTSMap = new HashMap(); + @Override + public ParseContext transform(ParseContext pctx) throws SemanticException { + + final Map topOps = pctx.getTopOps(); + if (topOps.size() < 2) { + // Nothing to do, bail out + return pctx; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Before SharedWorkOptimizer:\n" + Operator.toString(pctx.getTopOps().values())); + } + // Cache to use during optimization + SharedWorkOptimizerCache optimizerCache = new SharedWorkOptimizerCache(); + + // Gather information about the DPP table scans and store it in the cache + gatherDPPTableScanOps(pctx, optimizerCache); + + // Map of dbName.TblName -> TSOperator + Multimap tableNameToOps = SharedWorkOptimizer.splitTableScanOpsByTable(pctx); + + // We enforce a certain order when we do the reutilization. + // In particular, we use size of table x number of reads to + // rank the tables. + List> sortedTables = SharedWorkOptimizer.rankTablesByAccumulatedSize(pctx); + LOG.debug("Sorted tables by size: {}", sortedTables); + + // Execute optimization + Multimap existingOps = ArrayListMultimap.create(); + Set> removedOps = new HashSet<>(); + for (Map.Entry tablePair : sortedTables) { + String tableName = tablePair.getKey(); + for (TableScanOperator discardableTsOp : tableNameToOps.get(tableName)) { + if (removedOps.contains(discardableTsOp)) { + LOG.debug("Skip {} as it has been already removed", discardableTsOp); + continue; + } + Collection prevTsOps = existingOps.get(tableName); + for (TableScanOperator retainableTsOp : prevTsOps) { + if (removedOps.contains(retainableTsOp)) { + LOG.debug("Skip {} as it has been already removed", retainableTsOp); + continue; + } + + // First we quickly check if the two table scan operators can actually be merged + boolean mergeable = SharedWorkOptimizer.areMergeable(pctx, optimizerCache, retainableTsOp, discardableTsOp); + if (!mergeable) { + // Skip + LOG.debug("{} and {} cannot be merged", retainableTsOp, discardableTsOp); + continue; + } + + // Secondly, we extract information about the part of the tree that can be merged + // as well as some structural information (memory consumption) that needs to be + // used to determined whether the merge can happen + SharedResult sr = SharedWorkOptimizer.extractSharedOptimizationInfo( + pctx, optimizerCache, retainableTsOp, discardableTsOp); + + // It seems these two operators can be merged. + // Check that plan meets some preconditions before doing it. + // In particular, in the presence of map joins in the upstream plan: + // - we cannot exceed the noconditional task size, and + // - if we already merged the big table, we cannot merge the broadcast + // tables. + if (!SharedWorkOptimizer.validPreConditions(pctx, optimizerCache, sr)) { + // Skip + LOG.debug("{} and {} do not meet preconditions", retainableTsOp, discardableTsOp); + continue; + } + + // We can merge + if (sr.retainableOps.size() > 1) { + //TODO find the suitable case for this branch + // More than TS operator + Operator lastRetainableOp = sr.retainableOps.get(sr.retainableOps.size() - 1); + Operator lastDiscardableOp = sr.discardableOps.get(sr.discardableOps.size() - 1); + if (lastDiscardableOp.getNumChild() != 0) { + List> allChildren = + Lists.newArrayList(lastDiscardableOp.getChildOperators()); + for (Operator op : allChildren) { + lastDiscardableOp.getChildOperators().remove(op); + op.replaceParent(lastDiscardableOp, lastRetainableOp); + lastRetainableOp.getChildOperators().add(op); + } + } + + LOG.debug("Merging subtree starting at {} into subtree starting at {}", discardableTsOp, retainableTsOp); + } else { + // Only TS operator + ExprNodeGenericFuncDesc exprNode = null; + if (retainableTsOp.getConf().getFilterExpr() != null) { + // Push filter on top of children + SharedWorkOptimizer.pushFilterToTopOfTableScan(optimizerCache, retainableTsOp); + // Clone to push to table scan + exprNode = (ExprNodeGenericFuncDesc) retainableTsOp.getConf().getFilterExpr(); + } + if (discardableTsOp.getConf().getFilterExpr() != null) { + // Push filter on top + SharedWorkOptimizer.pushFilterToTopOfTableScan(optimizerCache, discardableTsOp); + ExprNodeGenericFuncDesc tsExprNode = discardableTsOp.getConf().getFilterExpr(); + if (exprNode != null && !exprNode.isSame(tsExprNode)) { + // We merge filters from previous scan by ORing with filters from current scan + if (exprNode.getGenericUDF() instanceof GenericUDFOPOr) { + List newChildren = new ArrayList<>(exprNode.getChildren().size() + 1); + for (ExprNodeDesc childExprNode : exprNode.getChildren()) { + if (childExprNode.isSame(tsExprNode)) { + // We do not need to do anything, it is in the OR expression + break; + } + newChildren.add(childExprNode); + } + if (exprNode.getChildren().size() == newChildren.size()) { + newChildren.add(tsExprNode); + exprNode = ExprNodeGenericFuncDesc.newInstance( + new GenericUDFOPOr(), + newChildren); + } + } else { + exprNode = ExprNodeGenericFuncDesc.newInstance( + new GenericUDFOPOr(), + Arrays.asList(exprNode, tsExprNode)); + } + } + } + // Replace filter + retainableTsOp.getConf().setFilterExpr(exprNode); + LOG.info("SharedTable.getInstance().addSharedTable< "+discardableTsOp.getOperatorId()+ " ,"+retainableTsOp.getOperatorId()+" >"); + SharedTable.getInstance().addSharedTable(discardableTsOp,retainableTsOp); + replaceFilterTSMap.put(discardableTsOp, retainableTsOp); + LOG.debug("Merging {} into {}", discardableTsOp, retainableTsOp); + } + + // First we remove the input operators of the expression that + // we are going to eliminate + for (Operator op : sr.discardableInputOps) { + //TODO Verify we need optimizerCache.removeOp(op) + optimizerCache.removeOp(op); + removedOps.add(op); + // Remove DPP predicates + if (op instanceof ReduceSinkOperator) { + SemiJoinBranchInfo sjbi = pctx.getRsToSemiJoinBranchInfo().get(op); + if (sjbi != null && !sr.discardableOps.contains(sjbi.getTsOp()) && + !sr.discardableInputOps.contains(sjbi.getTsOp())) { + //TODO To find similar code in Spark +// GenTezUtils.removeSemiJoinOperator( +// pctx, (ReduceSinkOperator) op, sjbi.getTsOp()); + } + } else if (op instanceof AppMasterEventOperator) { + DynamicPruningEventDesc dped = (DynamicPruningEventDesc) op.getConf(); + if (!sr.discardableOps.contains(dped.getTableScan()) && + !sr.discardableInputOps.contains(dped.getTableScan())) { + //TODO To find similar code in Spark +// GenTezUtils.removeSemiJoinOperator( +// pctx, (AppMasterEventOperator) op, dped.getTableScan()); + } + } + LOG.debug("Input operator removed: {}", op); + } + + removedOps.add(discardableTsOp); + // Finally we remove the expression from the tree + for (Operator op : sr.discardableOps) { + //TODO Verify we need optimizerCache.removeOp(op) + optimizerCache.removeOp(op); + removedOps.add(op); + if (sr.discardableOps.size() == 1) { + // If there is a single discardable operator, it is a TableScanOperator + // and it means that we have merged filter expressions for it. Thus, we + // might need to remove DPP predicates from the retainable TableScanOperator + Collection> c = + optimizerCache.tableScanToDPPSource.get((TableScanOperator) op); + for (Operator dppSource : c) { + if (dppSource instanceof ReduceSinkOperator) { + //TODO To find similar code in Spark +// GenTezUtils.removeSemiJoinOperator(pctx, +// (ReduceSinkOperator) dppSource, +// (TableScanOperator) sr.retainableOps.get(0)); + } else if (dppSource instanceof AppMasterEventOperator) { + //TODO To find similar code in Spark +// GenTezUtils.removeSemiJoinOperator(pctx, +// (AppMasterEventOperator) dppSource, +// (TableScanOperator) sr.retainableOps.get(0)); + } + } + } + LOG.debug("Operator removed: {}", op); + } + + break; + } + + if (removedOps.contains(discardableTsOp)) { + // This operator has been removed, remove it from the list of existing operators + existingOps.remove(tableName, discardableTsOp); + } else { + // This operator has not been removed, include it in the list of existing operators + existingOps.put(tableName, discardableTsOp); + } + } + } + + // Remove unused table scan operators + Iterator> it = topOps.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry e = it.next(); + if (e.getValue().getNumChild() == 0) { + it.remove(); + } + } + + //combine all conditions of TS to 1 condition + for(TableScanOperator discardableTsOp:replaceFilterTSMap.keySet()){ + TableScanOperator retainedTsOp = replaceFilterTSMap.get(discardableTsOp); + if( retainedTsOp.getConf().getFilterExpr()!= null) { + discardableTsOp.getConf().setFilterExpr( + retainedTsOp.getConf().getFilterExpr()); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("After SharedWorkOptimizer:\n" + Operator.toString(pctx.getTopOps().values())); + } + + return pctx; + } + + private static void gatherDPPTableScanOps( + ParseContext pctx, SharedWorkOptimizerCache optimizerCache) throws SemanticException { + // Find TS operators with partition pruning enabled in plan + // because these TS may potentially read different data for + // different pipeline. + // These can be: + // 1) TS with DPP. + // 2) TS with semijoin DPP. + Map topOps = pctx.getTopOps(); + Collection> tableScanOps = + Lists.>newArrayList(topOps.values()); + Set s = + OperatorUtils.findOperators(tableScanOps, SparkPartitionPruningSinkOperator.class); + for (SparkPartitionPruningSinkOperator a : s) { + if (a.getConf() instanceof SparkPartitionPruningSinkDesc) { + SparkPartitionPruningSinkDesc dped = (SparkPartitionPruningSinkDesc) a.getConf(); + optimizerCache.tableScanToDPPSource.put(dped.getTableScan(), a); + } + } + for (Map.Entry e + : pctx.getRsToSemiJoinBranchInfo().entrySet()) { + optimizerCache.tableScanToDPPSource.put(e.getValue().getTsOp(), e.getKey()); + } + LOG.debug("DPP information stored in the cache: {}", optimizerCache.tableScanToDPPSource); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 965044d..4701c2b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -28,6 +28,9 @@ import java.util.Stack; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.Multimap; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.ConditionalTask; @@ -81,6 +84,7 @@ import org.apache.hadoop.hive.ql.optimizer.spark.SparkJoinOptimizer; import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.optimizer.spark.SparkReduceSinkMapJoinProc; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkSharedWorkOptimizer; import org.apache.hadoop.hive.ql.optimizer.spark.SparkSkewJoinResolver; import org.apache.hadoop.hive.ql.optimizer.spark.SplitSparkWorkResolver; import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics; @@ -138,6 +142,10 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, new ConstantPropagate(ConstantPropagateProcCtx.ConstantPropagateOption.SHORTCUT).transform(pCtx); } + if (procCtx.getParseContext().getConf().getBoolVar(HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION)) { + new SparkSharedWorkOptimizer().transform(procCtx.getParseContext()); + } + PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE); } @@ -412,10 +420,22 @@ protected void generateTaskTree(List> rootTasks, Pa private void generateTaskTreeHelper(GenSparkProcContext procCtx, List topNodes) throws SemanticException { - // create a walker which walks the tree in a DFS manner while maintaining - // the operator stack. The dispatcher generates the plan from the operator tree - Map opRules = new LinkedHashMap(); +// // create a walker which walks the tree in a DFS manner while maintaining +// // the operator stack. The dispatcher generates the plan from the operator tree +// HashMap opRules1 = new LinkedHashMap(); +// opRules1.put(new RuleRegExp("Handle Analyze Command", +// TableScanOperator.getOperatorName() + "%"), +// new SparkProcessAnalyzeTable(GenSparkUtils.getUtils())); +// Dispatcher disp1 = new DefaultRuleDispatcher(null, opRules1, procCtx); +// GraphWalker ogw1 = new GenSparkWorkWalker(disp1, procCtx); +// ogw1.startWalking(topNodes, null); + + GenSparkWork genSparkWork = new GenSparkWork(GenSparkUtils.getUtils()); + HashMap opRules = new LinkedHashMap(); +// if (conf.getBoolVar(HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION)) { +// opRules.put(new RuleRegExp("Split work -TableScan", TableScanOperator.getOperatorName() + "%"), genSparkWork); +// } opRules.put(new RuleRegExp("Split Work - ReduceSink", ReduceSinkOperator.getOperatorName() + "%"), genSparkWork); @@ -429,9 +449,7 @@ private void generateTaskTreeHelper(GenSparkProcContext procCtx, List topN FileSinkOperator.getOperatorName() + "%"), new CompositeProcessor(new SparkFileSinkProcessor(), genSparkWork)); - opRules.put(new RuleRegExp("Handle Analyze Command", - TableScanOperator.getOperatorName() + "%"), - new SparkProcessAnalyzeTable(GenSparkUtils.getUtils())); + opRules.put(new RuleRegExp("Remember union", UnionOperator.getOperatorName() + "%"), new NodeProcessor() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkRuleDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkRuleDispatcher.java new file mode 100644 index 0000000..c2c8e76 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkRuleDispatcher.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.spark; + +import com.google.common.collect.Multimap; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.Stack; + +public class SparkRuleDispatcher implements Dispatcher { + private final Multimap procRules; + private final NodeProcessorCtx procCtx; + private final Set defaultProcSet; + + /** + * Constructor. + * + * @param defaultProc default processor to be fired if no rule matches + * @param rules operator processor that handles actual processing of the node + * @param procCtx operator processor context, which is opaque to the dispatcher + */ + public SparkRuleDispatcher(NodeProcessor defaultProc, + Multimap rules, NodeProcessorCtx procCtx) { + this.defaultProcSet = new HashSet(); + this.defaultProcSet.add(defaultProc); + procRules = rules; + this.procCtx = procCtx; + } + + /** + * Dispatcher function. + * + * @param nd operator to process + * @param ndStack the operators encountered so far + * @throws SemanticException + */ + @Override + public Object dispatch(Node nd, Stack ndStack, Object... nodeOutputs) + throws SemanticException { + + // find the firing rule + // find the rule from the stack specified + Rule rule = null; + int minCost = Integer.MAX_VALUE; + for (Rule r : procRules.keySet()) { + int cost = r.cost(ndStack); + if ((cost >= 0) && (cost < minCost)) { + minCost = cost; + rule = r; + } + } + + Collection procSet; + + if (rule == null) { + procSet = defaultProcSet; + } else { + procSet = procRules.get(rule); + } + + // Do nothing in case proc is null + Object ret = null; + for (NodeProcessor proc : procSet) { + if (proc != null) { + // Call the process function + ret = proc.process(nd, ndStack, procCtx, nodeOutputs); + } + } + return ret; + } +} -- 1.8.4.GIT From 6f7a917adefbd253f9893bc8338ee9a98f2e7388 Mon Sep 17 00:00:00 2001 From: kellyzly Date: Tue, 26 Dec 2017 14:46:16 +0800 Subject: [PATCH 02/11] not merge two branch --- .../optimizer/spark/SparkSharedWorkOptimizer.java | 36 ++++++++++++---------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSharedWorkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSharedWorkOptimizer.java index 621db27..dac19ab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSharedWorkOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSharedWorkOptimizer.java @@ -148,21 +148,23 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { // We can merge if (sr.retainableOps.size() > 1) { - //TODO find the suitable case for this branch - // More than TS operator - Operator lastRetainableOp = sr.retainableOps.get(sr.retainableOps.size() - 1); - Operator lastDiscardableOp = sr.discardableOps.get(sr.discardableOps.size() - 1); - if (lastDiscardableOp.getNumChild() != 0) { - List> allChildren = - Lists.newArrayList(lastDiscardableOp.getChildOperators()); - for (Operator op : allChildren) { - lastDiscardableOp.getChildOperators().remove(op); - op.replaceParent(lastDiscardableOp, lastRetainableOp); - lastRetainableOp.getChildOperators().add(op); - } - } + //For hive on tez, this feature may will merge two branches if more than 1 operator(TS) are same in the two + //branches. but for hive on spark, currently this is no need( not support). + +// // More than TS operator +// Operator lastRetainableOp = sr.retainableOps.get(sr.retainableOps.size() - 1); +// Operator lastDiscardableOp = sr.discardableOps.get(sr.discardableOps.size() - 1); +// if (lastDiscardableOp.getNumChild() != 0) { +// List> allChildren = +// Lists.newArrayList(lastDiscardableOp.getChildOperators()); +// for (Operator op : allChildren) { +// lastDiscardableOp.getChildOperators().remove(op); +// op.replaceParent(lastDiscardableOp, lastRetainableOp); +// lastRetainableOp.getChildOperators().add(op); +// } +// } - LOG.debug("Merging subtree starting at {} into subtree starting at {}", discardableTsOp, retainableTsOp); +// LOG.debug("Merging subtree starting at {} into subtree starting at {}", discardableTsOp, retainableTsOp); } else { // Only TS operator ExprNodeGenericFuncDesc exprNode = null; @@ -201,13 +203,15 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { } } // Replace filter + //TODO: understand why filters are not combined when there is more than 1 operator(TS) are same + //in the two branch retainableTsOp.getConf().setFilterExpr(exprNode); - LOG.info("SharedTable.getInstance().addSharedTable< "+discardableTsOp.getOperatorId()+ " ,"+retainableTsOp.getOperatorId()+" >"); - SharedTable.getInstance().addSharedTable(discardableTsOp,retainableTsOp); replaceFilterTSMap.put(discardableTsOp, retainableTsOp); LOG.debug("Merging {} into {}", discardableTsOp, retainableTsOp); } + LOG.info("SharedTable.getInstance().addSharedTable< "+discardableTsOp.getOperatorId()+ " ,"+retainableTsOp.getOperatorId()+" >"); + SharedTable.getInstance().addSharedTable(discardableTsOp,retainableTsOp); // First we remove the input operators of the expression that // we are going to eliminate for (Operator op : sr.discardableInputOps) { -- 1.8.4.GIT From cf49d5dcf7ba9bfc3a7f826e6a850d70da6aae8e Mon Sep 17 00:00:00 2001 From: kellyzly Date: Wed, 27 Dec 2017 15:01:52 +0800 Subject: [PATCH 03/11] fix NPE --- .../hadoop/hive/ql/exec/spark/SparkPlanGenerator.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index bc1148e..a16375e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -194,14 +194,14 @@ private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork, boolean JobConf jobConf = cloneJobConf(mapWork); MapInput mapInput = null; if (jobConf.getBoolean(HiveConf.ConfVars.PLAN.HIVE_SPARK_SHARED_WORK_OPTIMIZATION.varname, false) == false) { - mapInput = createMapInput(sparkPlan, mapWork, false); + mapInput = createMapInput(jobConf, sparkPlan, mapWork, false); } else { Operator operator = mapWork.getAllRootOperators().iterator().next(); TableScanOperator ts = (TableScanOperator) operator; if (!SharedTable.getInstance().getSharedTables().containsKey(ts)) { boolean needCache = needCache(ts); - mapInput = createMapInput(sparkPlan, mapWork, needCache); + mapInput = createMapInput(jobConf, sparkPlan, mapWork, needCache); if (needCache) { tsMapInputMap.put(ts, mapInput); } @@ -215,16 +215,16 @@ private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork, boolean return mapInput; } - private MapInput createMapInput(SparkPlan sparkPlan, MapWork mapWork, boolean cached) throws Exception { - Class ifClass = getInputFormat(jobConf, mapWork); + private MapInput createMapInput(JobConf jobConf, SparkPlan sparkPlan, MapWork mapWork, boolean cached) throws Exception { + Class ifClass = getInputFormat(this.jobConf, mapWork); JavaPairRDD hadoopRDD; if (mapWork.getNumMapTasks() != null) { - jobConf.setNumMapTasks(mapWork.getNumMapTasks()); - hadoopRDD = sc.hadoopRDD(jobConf, ifClass, + this.jobConf.setNumMapTasks(mapWork.getNumMapTasks()); + hadoopRDD = sc.hadoopRDD(this.jobConf, ifClass, WritableComparable.class, Writable.class, mapWork.getNumMapTasks()); } else { - hadoopRDD = sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class); + hadoopRDD = sc.hadoopRDD(this.jobConf, ifClass, WritableComparable.class, Writable.class); } // Caching is disabled for MapInput due to HIVE-8920 -- 1.8.4.GIT From 71967890947ec54b0a02167926b5a653ba691599 Mon Sep 17 00:00:00 2001 From: kellyzly Date: Wed, 27 Dec 2017 15:22:07 +0800 Subject: [PATCH 04/11] fix NPE-2 --- .../org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index a16375e..167ceaa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -216,15 +216,15 @@ private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork, boolean } private MapInput createMapInput(JobConf jobConf, SparkPlan sparkPlan, MapWork mapWork, boolean cached) throws Exception { - Class ifClass = getInputFormat(this.jobConf, mapWork); + Class ifClass = getInputFormat(jobConf, mapWork); JavaPairRDD hadoopRDD; if (mapWork.getNumMapTasks() != null) { - this.jobConf.setNumMapTasks(mapWork.getNumMapTasks()); - hadoopRDD = sc.hadoopRDD(this.jobConf, ifClass, + jobConf.setNumMapTasks(mapWork.getNumMapTasks()); + hadoopRDD = sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class, mapWork.getNumMapTasks()); } else { - hadoopRDD = sc.hadoopRDD(this.jobConf, ifClass, WritableComparable.class, Writable.class); + hadoopRDD = sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class); } // Caching is disabled for MapInput due to HIVE-8920 -- 1.8.4.GIT From 484008b04b08166c6ec1af14d752f12e3526fcd4 Mon Sep 17 00:00:00 2001 From: kellyzly Date: Wed, 27 Dec 2017 17:08:10 +0800 Subject: [PATCH 05/11] print more log about why rdd cache is not enabled in yarn mode --- .../hadoop/hive/ql/exec/spark/SparkPlanGenerator.java | 12 +++++++++--- .../org/apache/hadoop/hive/ql/optimizer/SharedTable.java | 14 +++++++++++++- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 167ceaa..29b9773 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -190,7 +190,8 @@ private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, @SuppressWarnings("unchecked") private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork, boolean cached) throws Exception { - LOG.info("MapWork :" + mapWork.getName() + " cached:" + cached); + //LOG.info("MapWork :" + mapWork.getName() + " cached:" + cached); + SharedTable.getInstance().print(); JobConf jobConf = cloneJobConf(mapWork); MapInput mapInput = null; if (jobConf.getBoolean(HiveConf.ConfVars.PLAN.HIVE_SPARK_SHARED_WORK_OPTIMIZATION.varname, false) == false) { @@ -201,14 +202,19 @@ private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork, boolean if (!SharedTable.getInstance().getSharedTables().containsKey(ts)) { boolean needCache = needCache(ts); - mapInput = createMapInput(jobConf, sparkPlan, mapWork, needCache); - if (needCache) { + mapInput = createMapInput(jobConf, sparkPlan, mapWork, needCache); + if (needCache) { tsMapInputMap.put(ts, mapInput); } } else { TableScanOperator retainedTs = SharedTable.getInstance().getSharedTables().get(ts); + if (tsMapInputMap.containsKey(retainedTs)) { mapInput = tsMapInputMap.get(retainedTs); + }else{ + //It will very confused why will go to here + LOG.info("We can not get the mapInput of retainedTs"); + mapInput = createMapInput(jobConf, sparkPlan, mapWork, false); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedTable.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedTable.java index 8dd4b8e..a746e99 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedTable.java @@ -21,11 +21,13 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator; import java.util.HashMap; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Store TS relationship, used in SparkSharedWorkOptimizer */ public class SharedTable { + private static final Logger LOG = LoggerFactory.getLogger(SharedTable.class); private static SharedTable instance = null; private HashMap sharedTables = new HashMap<>(); @@ -45,4 +47,14 @@ public void addSharedTable(TableScanOperator op1, TableScanOperator op2) { } + public void print() { + + for(TableScanOperator key: sharedTables.keySet()){ + LOG.info("key:"+key.getOperatorId()+ " value:"+ sharedTables.get(key).getOperatorId()); + } + + + + + } } -- 1.8.4.GIT From 22c03bf810a84e0de427a4fc8003ac6a9dabe090 Mon Sep 17 00:00:00 2001 From: kellyzly Date: Thu, 28 Dec 2017 15:55:43 +0800 Subject: [PATCH 06/11] save shareTableRelation in SparkWork --- .../apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java | 6 +++--- .../java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java | 3 ++- ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java | 11 +++++++++++ 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 29b9773..26cf29b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -139,7 +139,7 @@ private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, SparkTran result; if (work instanceof MapWork) { - result = generateMapInput(sparkPlan, (MapWork)work, isCachingWork(work, sparkWork)); + result = generateMapInput(sparkPlan, (MapWork)work, isCachingWork(work, sparkWork),sparkWork); sparkPlan.addTran(result); } else if (work instanceof ReduceWork) { List parentWorks = sparkWork.getParents(work); @@ -188,7 +188,7 @@ private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, } @SuppressWarnings("unchecked") - private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork, boolean cached) + private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork, boolean cached, SparkWork sparkWork) throws Exception { //LOG.info("MapWork :" + mapWork.getName() + " cached:" + cached); SharedTable.getInstance().print(); @@ -207,7 +207,7 @@ private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork, boolean tsMapInputMap.put(ts, mapInput); } } else { - TableScanOperator retainedTs = SharedTable.getInstance().getSharedTables().get(ts); + TableScanOperator retainedTs = sparkWork.getSharedTableMap().get(ts); if (tsMapInputMap.containsKey(retainedTs)) { mapInput = tsMapInputMap.get(retainedTs); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 0f5f708..ebfc7ee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.apache.hadoop.hive.ql.optimizer.SharedTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -107,7 +108,7 @@ public int execute(DriverContext driverContext) { SparkWork sparkWork = getWork(); sparkWork.setRequiredCounterPrefix(getOperatorCounters()); - + sparkWork.setSharedTableMap(SharedTable.getInstance().getSharedTables()); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); submitTime = perfLogger.getStartTime(PerfLogger.SPARK_SUBMIT_JOB); jobRef = sparkSession.submit(driverContext, sparkWork); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java index 8332272..3e60c15 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java @@ -34,6 +34,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -67,6 +68,8 @@ private Map cloneToWork; + private Map sharedTableMap; + public SparkWork(String queryId) { this.queryId = queryId; this.dagName = queryId + ":" + counter.getAndIncrement(); @@ -434,4 +437,12 @@ public String toString() { public void setCloneToWork(Map cloneToWork) { this.cloneToWork = cloneToWork; } + + public Map getSharedTableMap() { + return sharedTableMap; + } + + public void setSharedTableMap(Map sharedTableMap) { + this.sharedTableMap = sharedTableMap; + } } -- 1.8.4.GIT From 30debaec3aff71a816db3685a7d9dd08b02db1e1 Mon Sep 17 00:00:00 2001 From: kellyzly Date: Fri, 29 Dec 2017 16:06:22 +0800 Subject: [PATCH 07/11] not use SharedTableMap in SparkPlanGenerator --- .../hadoop/hive/ql/exec/spark/SparkPlanGenerator.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 26cf29b..1b53db9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -25,6 +25,7 @@ import java.util.Set; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.index.TableBasedIndexHandler; import org.apache.hadoop.hive.ql.optimizer.SharedTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -191,7 +192,9 @@ private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork, boolean cached, SparkWork sparkWork) throws Exception { //LOG.info("MapWork :" + mapWork.getName() + " cached:" + cached); - SharedTable.getInstance().print(); + Map sharedTableMap = new HashMap(); + sharedTableMap = sparkWork.getSharedTableMap(); + print(sparkWork.getSharedTableMap()); JobConf jobConf = cloneJobConf(mapWork); MapInput mapInput = null; if (jobConf.getBoolean(HiveConf.ConfVars.PLAN.HIVE_SPARK_SHARED_WORK_OPTIMIZATION.varname, false) == false) { @@ -199,7 +202,7 @@ private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork, boolean } else { Operator operator = mapWork.getAllRootOperators().iterator().next(); TableScanOperator ts = (TableScanOperator) operator; - if (!SharedTable.getInstance().getSharedTables().containsKey(ts)) { + if (!sharedTableMap.containsKey(ts)) { boolean needCache = needCache(ts); mapInput = createMapInput(jobConf, sparkPlan, mapWork, needCache); @@ -221,6 +224,12 @@ private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork, boolean return mapInput; } + private void print(Map sharedTableMap) { + for(TableScanOperator key: sharedTableMap.keySet()){ + LOG.info("key:"+key.getOperatorId()+ " value:"+ sharedTableMap.get(key).getOperatorId()); + } + } + private MapInput createMapInput(JobConf jobConf, SparkPlan sparkPlan, MapWork mapWork, boolean cached) throws Exception { Class ifClass = getInputFormat(jobConf, mapWork); -- 1.8.4.GIT From 75f2374381ec41b375d2d8c3bd2bd3e82d1cc867 Mon Sep 17 00:00:00 2001 From: kellyzly Date: Fri, 29 Dec 2017 16:26:20 +0800 Subject: [PATCH 08/11] not use SharedTableMap in SparkPlanGenerator#needCache --- .../java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 1b53db9..eb990e4a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.index.TableBasedIndexHandler; -import org.apache.hadoop.hive.ql.optimizer.SharedTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -248,8 +247,7 @@ private MapInput createMapInput(JobConf jobConf, SparkPlan sparkPlan, MapWork ma return result; } - private boolean needCache(TableScanOperator ts) { - Map sharedTables = SharedTable.getInstance().getSharedTables(); + private boolean needCache(TableScanOperator ts,Map sharedTables){ for(TableScanOperator key: sharedTables.keySet()){ if( sharedTables.get(key).getOperatorId().equals(ts.getOperatorId())){ return true; -- 1.8.4.GIT From e5df4cc513eec82650ab1dc873aec36ab7947318 Mon Sep 17 00:00:00 2001 From: kellyzly Date: Fri, 29 Dec 2017 16:33:02 +0800 Subject: [PATCH 09/11] not use SharedTableMap in SparkPlanGenerator#needCache-3 --- .../java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index eb990e4a..e1fe019 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -202,7 +202,7 @@ private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork, boolean Operator operator = mapWork.getAllRootOperators().iterator().next(); TableScanOperator ts = (TableScanOperator) operator; if (!sharedTableMap.containsKey(ts)) { - boolean needCache = needCache(ts); + boolean needCache = needCache(ts,sharedTableMap); mapInput = createMapInput(jobConf, sparkPlan, mapWork, needCache); if (needCache) { -- 1.8.4.GIT From 873568b0c71a7947ea1b3c1d48ed88e03db24762 Mon Sep 17 00:00:00 2001 From: kellyzly Date: Sun, 31 Dec 2017 06:42:55 +0800 Subject: [PATCH 10/11] enable rdd cache to view the issue of NPE --- .../java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index e1fe019..ce4ac58 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -197,7 +197,8 @@ private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork, boolean JobConf jobConf = cloneJobConf(mapWork); MapInput mapInput = null; if (jobConf.getBoolean(HiveConf.ConfVars.PLAN.HIVE_SPARK_SHARED_WORK_OPTIMIZATION.varname, false) == false) { - mapInput = createMapInput(jobConf, sparkPlan, mapWork, false); + boolean enabledRDDCache = jobConf.getBoolean("spark.rdd.cache", false); + mapInput = createMapInput(jobConf, sparkPlan, mapWork, enabledRDDCache); } else { Operator operator = mapWork.getAllRootOperators().iterator().next(); TableScanOperator ts = (TableScanOperator) operator; -- 1.8.4.GIT From 56815c2f9a5e89618861f828e6f33ffbb2803b5c Mon Sep 17 00:00:00 2001 From: kellyzly Date: Sun, 31 Dec 2017 07:06:31 +0800 Subject: [PATCH 11/11] add log in initIOContext --- .../org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java index 4bc60dc..a2c3165 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java @@ -172,6 +172,9 @@ private void initIOContext(long startPos, boolean isBlockPointer, ioCxtRef.setBlockPointer(isBlockPointer); ioCxtRef.setInputPath(inputPath); LOG.debug("Processing file " + inputPath); // Logged at INFO in multiple other places. + for (StackTraceElement ste : Thread.currentThread().getStackTrace()) { + LOG.info(ste.toString()); + } initDone = true; } -- 1.8.4.GIT