diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3be5a8d..d886f34 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1688,6 +1688,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_SHARED_WORK_OPTIMIZATION("hive.optimize.shared.work", true, "Whether to enable shared work optimizer. The optimizer finds scan operator over the same table\n" + "and follow-up operators in the query plan and merges them if they meet some preconditions. Tez only."), + HIVE_SPARK_SHARED_WORK_OPTIMIZATION("hive.spark.optimize.shared.work", false, + "Whether to enable shared work optimizer. The optimizer finds scan operator over the same table\n" + + "and follow-up operators in the query plan and merges them if they meet some preconditions."), HIVE_COMBINE_EQUIVALENT_WORK_OPTIMIZATION("hive.combine.equivalent.work.optimization", true, "Whether to " + "combine equivalent work objects during physical optimization.\n This optimization looks for equivalent " + "work objects and combines them if they meet certain preconditions. Spark only."), diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index cca1055..2ac0344 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1514,7 +1514,7 @@ miniSparkOnYarn.only.query.files=spark_combine_equivalent_work.q,\ spark_use_ts_stats_for_mapjoin.q,\ spark_use_op_stats.q,\ spark_explain_groupbyshuffle.q,\ - spark_opt_shuffle_serde.q + spark_opt_shuffle_serde.q, miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\ bucket4.q,\ @@ -1581,7 +1581,8 @@ miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\ # ql_rewrite_gbtoidx_cbo_1.q,\ # smb_mapjoin_8.q,\ -localSpark.only.query.files=spark_local_queries.q +localSpark.only.query.files=spark_local_queries.q,\ + spark_optimize_shared_work.q spark.query.negative.files=groupby2_map_skew_multi_distinct.q,\ groupby2_multi_distinct.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index a5a56ea..c54ef84 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -28,7 +28,6 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Set; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -430,7 +429,7 @@ public void setChildren(Configuration hconf) throws Exception { Configuration newConf = tableNameToConf.get(tableDesc.getTableName()); for (String alias : aliases) { - Operator op = conf.getAliasToWork().get(alias); + Operator op = getRootOfMapWork(alias); if (LOG.isDebugEnabled()) { LOG.debug("Adding alias " + alias + " to work list for file " + onefile); @@ -460,6 +459,10 @@ public void setChildren(Configuration hconf) throws Exception { setChildOperators(children); } + protected Operator getRootOfMapWork(String alias) { + return conf.getAliasToWork().get(alias); + } + private void initOperatorContext(List> children) throws HiveException { for (Map, MapOpCtx> contexts : opCtxMap.values()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkMapOperator2.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkMapOperator2.java new file mode 100644 index 0000000..ba0f18c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkMapOperator2.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.ql.CompilationOpContext; + +/** + * the second M for M->M->R in Hive on Spark + */ +public class SparkMapOperator2 extends MapOperator { + + private Operator root = null; + + public SparkMapOperator2(CompilationOpContext ctx) { + super(ctx); + } + + protected Operator getRootOfMapWork(String alias) { + return root; + } + + public void setRoootOfMapWork(Operator root) throws Exception { + this.root = root; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java index 8f397fa..956dce3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java @@ -148,4 +148,5 @@ public void setFetchOperators(Map fetchOperators) { public IOContext getIoCxt() { return ioCxt; } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java index c177a43..1a52f62 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java @@ -22,6 +22,8 @@ import java.util.Iterator; import java.util.List; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hive.ql.exec.SparkMapOperator2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.CompilationOpContext; @@ -75,7 +77,15 @@ if (mrwork.getVectorMode()) { mo = new VectorMapOperator(runtimeCtx); } else { - mo = new MapOperator(runtimeCtx); + if (job.getBoolean("hive.spark.optimize.shared.work", false) == false) { + mo = new MapOperator(runtimeCtx); + } else { + mo = new SparkMapOperator2(runtimeCtx); + Preconditions.checkArgument(mrwork.getAllRootOperators().size() == 1, + "AssertionError: expected root.getParentOperators() to be empty"); + Operator rootOp = mrwork.getAllRootOperators().iterator().next(); + ((SparkMapOperator2) mo).setRoootOfMapWork(rootOp); + } } mo.setConf(mrwork); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 079ec42..15b2ae4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -104,14 +104,43 @@ public SparkPlan generate(SparkWork sparkWork) throws Exception { workToParentWorkTranMap.clear(); try { - for (BaseWork work : sparkWork.getAllWork()) { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName()); - SparkTran tran = generate(work, sparkWork); - SparkTran parentTran = generateParentTran(sparkPlan, sparkWork, work); - sparkPlan.addTran(tran); - sparkPlan.connect(parentTran, tran); - workToTranMap.put(work, tran); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName()); + if (jobConf.getBoolean("hive.spark.optimize.shared.work", true) == false) { + for (BaseWork work : sparkWork.getAllWork()) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName()); + SparkTran tran = generate(work, sparkWork); + SparkTran parentTran = generateParentTran(sparkPlan, sparkWork, work); + sparkPlan.addTran(tran); + sparkPlan.connect(parentTran, tran); + workToTranMap.put(work, tran); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName()); + } + } else { + for (BaseWork work : sparkWork.getAllWork()) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName()); + SparkTran tran; + SparkTran mapInput = null; + if (work instanceof MapWork && work.getAllOperators().size() == 1) { + //MapInput + tran = generateMapInput(sparkPlan, (MapWork) work,isCachingWork(work,sparkWork)); + sparkPlan.addTran(tran); + } else if (work instanceof MapWork && work.getAllOperators().size() > 1) { + ((MapWork) work).setSkipInitializeFileInputFormat(true); + tran = generate(work, sparkWork); + sparkPlan.addTran(tran); + BaseWork parent = sparkWork.getParents(work).get(0); + Preconditions.checkArgument(workToTranMap.containsKey(parent), + "AssertionError: expected workToTranMap.containsKey(parent) to be true"); + SparkTran parentTran = workToTranMap.get(parent); + sparkPlan.connect(parentTran, tran); + } else { + SparkTran shuffleTran = generateParentTran(sparkPlan, sparkWork, work); + tran = generate(work, sparkWork); + sparkPlan.addTran(tran); + sparkPlan.connect(shuffleTran, tran); + } + workToTranMap.put(work, tran); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName()); + } } } finally { // clear all ThreadLocal cached MapWork/ReduceWork after plan generation @@ -135,7 +164,7 @@ private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, SparkTran result; if (work instanceof MapWork) { - result = generateMapInput(sparkPlan, (MapWork)work); + result = generateMapInput(sparkPlan, (MapWork)work, isCachingWork(work, sparkWork)); sparkPlan.addTran(result); } else if (work instanceof ReduceWork) { List parentWorks = sparkWork.getParents(work); @@ -184,8 +213,9 @@ private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, } @SuppressWarnings("unchecked") - private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork) + private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork, boolean cached) throws Exception { + LOG.info("MapWork :"+mapWork.getName()+ " cached:"+cached); JobConf jobConf = cloneJobConf(mapWork); Class ifClass = getInputFormat(jobConf, mapWork); @@ -199,7 +229,8 @@ private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork) } // Caching is disabled for MapInput due to HIVE-8920 - MapInput result = new MapInput(sparkPlan, hadoopRDD, false/*cloneToWork.containsKey(mapWork)*/); + // MapInput result = new MapInput(sparkPlan, hadoopRDD, false/*cloneToWork.containsKey(mapWork)*/); + MapInput result = new MapInput(sparkPlan, hadoopRDD, cached); return result; } @@ -296,11 +327,18 @@ private JobConf cloneJobConf(BaseWork work) throws Exception { if (work instanceof MapWork) { MapWork mapWork = (MapWork) work; cloned.setBoolean("mapred.task.is.map", true); - List inputPaths = Utilities.getInputPaths(cloned, mapWork, - scratchDir, context, false); - Utilities.setInputPaths(cloned, inputPaths); + // HIVE-17486: M-R structure is changed to M-M-R when enabling hive.spark.optimize.shared.work. + // for the first M, we need initialize FileInputFormat by Utilities#getInputPaths + // for the second M, we need not initialize FileInputFormat + if (!mapWork.isSkipInitializeFileInputFormat()) { + List inputPaths = Utilities.getInputPaths(cloned, mapWork, + scratchDir, context, false); + Utilities.setInputPaths(cloned, inputPaths); + } Utilities.setMapWork(cloned, mapWork, scratchDir, false); - Utilities.createTmpDirs(cloned, mapWork); + if (!mapWork.isSkipInitializeFileInputFormat()) { + Utilities.createTmpDirs(cloned, mapWork); + } if (work instanceof MergeFileWork) { MergeFileWork mergeFileWork = (MergeFileWork) work; cloned.set(Utilities.MAPRED_MAPPER_CLASS, MergeFileMapper.class.getName()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedResult.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedResult.java new file mode 100644 index 0000000..8a90256 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedResult.java @@ -0,0 +1,27 @@ +package org.apache.hadoop.hive.ql.optimizer; + +import org.apache.hadoop.hive.ql.exec.Operator; + +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +public class SharedResult { + public final List> retainableOps; + public final List> discardableOps; + public final Set> discardableInputOps; + public final long dataSize; + public final long maxDataSize; + + public SharedResult(Collection> retainableOps, Collection> discardableOps, + Set> discardableInputOps, long dataSize, long maxDataSize) { + this.retainableOps = ImmutableList.copyOf(retainableOps); + this.discardableOps = ImmutableList.copyOf(discardableOps); + this.discardableInputOps = ImmutableSet.copyOf(discardableInputOps); + this.dataSize = dataSize; + this.maxDataSize = maxDataSize; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java index d4ddb75..f3df8e2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java @@ -36,6 +36,7 @@ import java.util.TreeMap; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; @@ -49,6 +50,7 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.parse.GenTezUtils; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; @@ -263,13 +265,26 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { GenTezUtils.removeSemiJoinOperator( pctx, (ReduceSinkOperator) op, sjbi.getTsOp()); } - } else if (op instanceof AppMasterEventOperator) { - DynamicPruningEventDesc dped = (DynamicPruningEventDesc) op.getConf(); - if (!sr.discardableOps.contains(dped.getTableScan()) && - !sr.discardableInputOps.contains(dped.getTableScan())) { - GenTezUtils.removeSemiJoinOperator( - pctx, (AppMasterEventOperator) op, dped.getTableScan()); + } else if (op.getConf() instanceof DynamicPruningEventDesc || op.getConf() instanceof SparkPartitionPruningSinkDesc) { + if (HiveConf.getVar(pctx.getConf(), + HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + DynamicPruningEventDesc dped =(DynamicPruningEventDesc) op.getConf(); + if (!sr.discardableOps.contains(dped.getTableScan()) && + !sr.discardableInputOps.contains(dped.getTableScan())) { + GenTezUtils.removeSemiJoinOperator( + pctx, (AppMasterEventOperator) op, dped.getTableScan()); + } + } else if (HiveConf.getVar(pctx.getConf(), + HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + SparkPartitionPruningSinkDesc dped = (SparkPartitionPruningSinkDesc) op.getConf(); + if (!sr.discardableOps.contains(dped.getTableScan()) && + !sr.discardableInputOps.contains(dped.getTableScan())) { + // current in Hive on Spark,there is no similar feature like "hive.tez.dynamic.semijoin.reduction(HIVE-16862) + /*GenTezUtils.removeSemiJoinOperator( + pctx, (AppMasterEventOperator) op, dped.getTableScan());*/ + } } + } LOG.debug("Input operator removed: {}", op); } @@ -361,7 +376,7 @@ private static void gatherDPPTableScanOps( LOG.debug("DPP information stored in the cache: {}", optimizerCache.tableScanToDPPSource); } - private static Multimap splitTableScanOpsByTable( + public static Multimap splitTableScanOpsByTable( ParseContext pctx) { Multimap tableNameToOps = ArrayListMultimap.create(); // Sort by operator ID so we get deterministic results @@ -375,7 +390,7 @@ private static void gatherDPPTableScanOps( return tableNameToOps; } - private static List> rankTablesByAccumulatedSize(ParseContext pctx) { + public static List> rankTablesByAccumulatedSize(ParseContext pctx) { Map tableToTotalSize = new HashMap<>(); for (Entry e : pctx.getTopOps().entrySet()) { TableScanOperator tsOp = e.getValue(); @@ -402,7 +417,7 @@ public int compare(Map.Entry o1, Map.Entry o2) { return sortedTables; } - private static boolean areMergeable(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, + public static boolean areMergeable(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, TableScanOperator tsOp1, TableScanOperator tsOp2) throws SemanticException { // First we check if the two table scan operators can actually be merged // If schemas do not match, we currently do not merge @@ -486,11 +501,11 @@ private static boolean areMergeable(ParseContext pctx, SharedWorkOptimizerCache return true; } - private static SharedResult extractSharedOptimizationInfo(ParseContext pctx, + public static SharedResult extractSharedOptimizationInfo(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, TableScanOperator retainableTsOp, TableScanOperator discardableTsOp) throws SemanticException { - Set> retainableOps = new LinkedHashSet<>(); + Set> retainableOps = new LinkedHashSet<>(); Set> discardableOps = new LinkedHashSet<>(); Set> discardableInputOps = new HashSet<>(); long dataSize = 0l; @@ -591,10 +606,18 @@ private static SharedResult extractSharedOptimizationInfo(ParseContext pctx, equalOp2 = currentOp2; retainableOps.add(equalOp1); discardableOps.add(equalOp2); + long noconditionalTaskSize = pctx.getConf().getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); if (equalOp1 instanceof MapJoinOperator) { MapJoinOperator mop = (MapJoinOperator) equalOp1; dataSize = StatsUtils.safeAdd(dataSize, mop.getConf().getInMemoryDataSize()); - maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + //for tez + if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + + } else if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + // for spark + maxDataSize = noconditionalTaskSize; + } } if (currentOp1.getChildOperators().size() > 1 || currentOp2.getChildOperators().size() > 1) { @@ -608,11 +631,19 @@ private static SharedResult extractSharedOptimizationInfo(ParseContext pctx, // Add the rest to the memory consumption Set> opsWork1 = findWorkOperators(optimizerCache, currentOp1); + long noconditionalTaskSize = pctx.getConf().getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); for (Operator op : opsWork1) { if (op instanceof MapJoinOperator && !retainableOps.contains(op)) { MapJoinOperator mop = (MapJoinOperator) op; dataSize = StatsUtils.safeAdd(dataSize, mop.getConf().getInMemoryDataSize()); - maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + //for tez + if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + + } else if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + // for spark + maxDataSize = noconditionalTaskSize; + } } } Set> opsWork2 = findWorkOperators(optimizerCache, currentOp2); @@ -620,7 +651,13 @@ private static SharedResult extractSharedOptimizationInfo(ParseContext pctx, if (op instanceof MapJoinOperator && !discardableOps.contains(op)) { MapJoinOperator mop = (MapJoinOperator) op; dataSize = StatsUtils.safeAdd(dataSize, mop.getConf().getInMemoryDataSize()); - maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + //for tez + if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + } else if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + // for spark + maxDataSize = noconditionalTaskSize; + } } } @@ -797,7 +834,7 @@ private static boolean compareOperator(ParseContext pctx, Operator op1, Opera return op1.logicalEquals(op2); } - private static boolean validPreConditions(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, + public static boolean validPreConditions(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, SharedResult sr) { // We check whether merging the works would cause the size of @@ -1012,11 +1049,20 @@ private static boolean validPreConditions(ParseContext pctx, SharedWorkOptimizer if (sjbi != null) { set.addAll(findWorkOperators(optimizerCache, sjbi.getTsOp())); } - } else if(op.getConf() instanceof DynamicPruningEventDesc) { + } else if(op.getConf() instanceof DynamicPruningEventDesc || op.getConf() instanceof SparkPartitionPruningSinkDesc) { // DPP work is considered a descendant because work needs // to finish for it to execute - set.addAll(findWorkOperators( - optimizerCache, ((DynamicPruningEventDesc) op.getConf()).getTableScan())); + if (HiveConf.getVar(pctx.getConf(), + HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + //for tez + set.addAll(findWorkOperators( + optimizerCache, ((DynamicPruningEventDesc) op.getConf()).getTableScan())); + }else if (HiveConf.getVar(pctx.getConf(), + HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + //for spark + set.addAll(findWorkOperators( + optimizerCache, ((SparkPartitionPruningSinkDesc) op.getConf()).getTableScan())); + } } } workOps = set; @@ -1064,7 +1110,7 @@ private static boolean validPreConditions(ParseContext pctx, SharedWorkOptimizer return found; } - private static void pushFilterToTopOfTableScan( + public static void pushFilterToTopOfTableScan( SharedWorkOptimizerCache optimizerCache, TableScanOperator tsOp) throws UDFArgumentException { ExprNodeGenericFuncDesc tableScanExprNode = tsOp.getConf().getFilterExpr(); @@ -1104,75 +1150,4 @@ private static void pushFilterToTopOfTableScan( } } } - - private static class SharedResult { - final List> retainableOps; - final List> discardableOps; - final Set> discardableInputOps; - final long dataSize; - final long maxDataSize; - - private SharedResult(Collection> retainableOps, Collection> discardableOps, - Set> discardableInputOps, long dataSize, long maxDataSize) { - this.retainableOps = ImmutableList.copyOf(retainableOps); - this.discardableOps = ImmutableList.copyOf(discardableOps); - this.discardableInputOps = ImmutableSet.copyOf(discardableInputOps); - this.dataSize = dataSize; - this.maxDataSize = maxDataSize; - } - } - - /** Cache to accelerate optimization */ - private static class SharedWorkOptimizerCache { - // Operators that belong to each work - final HashMultimap, Operator> operatorToWorkOperators = - HashMultimap., Operator>create(); - // Table scan operators to DPP sources - final Multimap> tableScanToDPPSource = - HashMultimap.>create(); - - // Add new operator to cache work group of existing operator (if group exists) - void putIfWorkExists(Operator opToAdd, Operator existingOp) { - List> c = ImmutableList.copyOf(operatorToWorkOperators.get(existingOp)); - if (!c.isEmpty()) { - for (Operator op : c) { - operatorToWorkOperators.get(op).add(opToAdd); - } - operatorToWorkOperators.putAll(opToAdd, c); - operatorToWorkOperators.put(opToAdd, opToAdd); - } - } - - // Remove operator - void removeOp(Operator opToRemove) { - Set> s = operatorToWorkOperators.get(opToRemove); - s.remove(opToRemove); - List> c1 = ImmutableList.copyOf(s); - if (!c1.isEmpty()) { - for (Operator op1 : c1) { - operatorToWorkOperators.remove(op1, opToRemove); // Remove operator - } - operatorToWorkOperators.removeAll(opToRemove); // Remove entry for operator - } - } - - // Remove operator and combine - void removeOpAndCombineWork(Operator opToRemove, Operator replacementOp) { - Set> s = operatorToWorkOperators.get(opToRemove); - s.remove(opToRemove); - List> c1 = ImmutableList.copyOf(s); - List> c2 = ImmutableList.copyOf(operatorToWorkOperators.get(replacementOp)); - if (!c1.isEmpty() && !c2.isEmpty()) { - for (Operator op1 : c1) { - operatorToWorkOperators.remove(op1, opToRemove); // Remove operator - operatorToWorkOperators.putAll(op1, c2); // Add ops of new collection - } - operatorToWorkOperators.removeAll(opToRemove); // Remove entry for operator - for (Operator op2 : c2) { - operatorToWorkOperators.putAll(op2, c1); // Add ops to existing collection - } - } - } - } - } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizerCache.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizerCache.java new file mode 100644 index 0000000..0b72a07 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizerCache.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Multimap; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; + +import java.util.List; +import java.util.Set; + +/** + * Cache to accelerate optimization + */ +public class SharedWorkOptimizerCache { + // Operators that belong to each work + final HashMultimap, Operator> operatorToWorkOperators = + HashMultimap., Operator>create(); + // Table scan operators to DPP sources + public final Multimap> tableScanToDPPSource = + HashMultimap.>create(); + + // Add new operator to cache work group of existing operator (if group exists) + void putIfWorkExists(Operator opToAdd, Operator existingOp) { + List> c = ImmutableList.copyOf(operatorToWorkOperators.get(existingOp)); + if (!c.isEmpty()) { + for (Operator op : c) { + operatorToWorkOperators.get(op).add(opToAdd); + } + operatorToWorkOperators.putAll(opToAdd, c); + operatorToWorkOperators.put(opToAdd, opToAdd); + } + } + + // Remove operator + public void removeOp(Operator opToRemove) { + Set> s = operatorToWorkOperators.get(opToRemove); + s.remove(opToRemove); + List> c1 = ImmutableList.copyOf(s); + if (!c1.isEmpty()) { + for (Operator op1 : c1) { + operatorToWorkOperators.remove(op1, opToRemove); // Remove operator + } + operatorToWorkOperators.removeAll(opToRemove); // Remove entry for operator + } + } + + // Remove operator and combine + public void removeOpAndCombineWork(Operator opToRemove, Operator replacementOp) { + Set> s = operatorToWorkOperators.get(opToRemove); + s.remove(opToRemove); + List> c1 = ImmutableList.copyOf(s); + List> c2 = ImmutableList.copyOf(operatorToWorkOperators.get(replacementOp)); + if (!c1.isEmpty() && !c2.isEmpty()) { + for (Operator op1 : c1) { + operatorToWorkOperators.remove(op1, opToRemove); // Remove operator + operatorToWorkOperators.putAll(op1, c2); // Add ops of new collection + } + operatorToWorkOperators.removeAll(opToRemove); // Remove entry for operator + for (Operator op2 : c2) { + operatorToWorkOperators.putAll(op2, c1); // Add ops to existing collection + } + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java index 8cedbe5..8d6c7be 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java @@ -99,6 +99,9 @@ LOG.info("Convert to non-bucketed map join"); MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos); + //Refer tez code ConvertJoinMapJoin.estimateNumBuckets, if numBuckets <0, then defaultNumBucket is 1 + int defaultNumBucket = 1; + mapJoinOp.getConf().setInMemoryDataSize(mapJoinInfo[2] / defaultNumBucket); // For native vectorized map join, we require the key SerDe to be BinarySortableSerDe // Note: the MJ may not really get natively-vectorized later, // but changing SerDe won't hurt correctness @@ -115,6 +118,9 @@ LOG.info("Converted to map join with " + numBuckets + " buckets"); bucketColNames = joinOp.getOpTraits().getBucketColNames(); mapJoinInfo[2] /= numBuckets; + //Refer tez code in ConvertJoinMapJoin.getMapJoinConversionPos() + mapJoinOp.getConf().setInMemoryDataSize(mapJoinInfo[2]); + } else { LOG.info("Can not convert to bucketed map join"); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSharedWorkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSharedWorkOptimizer.java new file mode 100644 index 0000000..4823b56 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSharedWorkOptimizer.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.spark; + +import com.google.common.collect.Lists; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.Multimap; +import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.optimizer.SharedResult; +import org.apache.hadoop.hive.ql.optimizer.SharedWorkOptimizer; +import org.apache.hadoop.hive.ql.optimizer.SharedWorkOptimizerCache; +import org.apache.hadoop.hive.ql.optimizer.Transform; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo; +import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; +import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ArrayListMultimap; +/** + * Shared computation optimizer. + * + *

Originally, this rule would find scan operators over the same table + * in the query plan and merge them if they met some preconditions. + * + * TS TS TS + * | | -> / \ + * Op Op Op Op + * + *

A limitation in the current implementation is that the optimizer does not + * go beyond a work boundary. + * + *

The optimization works with the Spark execution engine. + **/ + +public class SparkSharedWorkOptimizer extends Transform { + + private final static Logger LOG = LoggerFactory.getLogger(SparkSharedWorkOptimizer.class); + + private HashMap replaceFilterTSMap = new HashMap(); + @Override + public ParseContext transform(ParseContext pctx) throws SemanticException { + + final Map topOps = pctx.getTopOps(); + if (topOps.size() < 2) { + // Nothing to do, bail out + return pctx; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Before SharedWorkOptimizer:\n" + Operator.toString(pctx.getTopOps().values())); + } + // Cache to use during optimization + SharedWorkOptimizerCache optimizerCache = new SharedWorkOptimizerCache(); + + // Gather information about the DPP table scans and store it in the cache + gatherDPPTableScanOps(pctx, optimizerCache); + + // Map of dbName.TblName -> TSOperator + Multimap tableNameToOps = SharedWorkOptimizer.splitTableScanOpsByTable(pctx); + + // We enforce a certain order when we do the reutilization. + // In particular, we use size of table x number of reads to + // rank the tables. + List> sortedTables = SharedWorkOptimizer.rankTablesByAccumulatedSize(pctx); + LOG.debug("Sorted tables by size: {}", sortedTables); + + // Execute optimization + Multimap existingOps = ArrayListMultimap.create(); + Set> removedOps = new HashSet<>(); + for (Map.Entry tablePair : sortedTables) { + String tableName = tablePair.getKey(); + for (TableScanOperator discardableTsOp : tableNameToOps.get(tableName)) { + if (removedOps.contains(discardableTsOp)) { + LOG.debug("Skip {} as it has been already removed", discardableTsOp); + continue; + } + Collection prevTsOps = existingOps.get(tableName); + for (TableScanOperator retainableTsOp : prevTsOps) { + if (removedOps.contains(retainableTsOp)) { + LOG.debug("Skip {} as it has been already removed", retainableTsOp); + continue; + } + + // First we quickly check if the two table scan operators can actually be merged + boolean mergeable = SharedWorkOptimizer.areMergeable(pctx, optimizerCache, retainableTsOp, discardableTsOp); + if (!mergeable) { + // Skip + LOG.debug("{} and {} cannot be merged", retainableTsOp, discardableTsOp); + continue; + } + + // Secondly, we extract information about the part of the tree that can be merged + // as well as some structural information (memory consumption) that needs to be + // used to determined whether the merge can happen + SharedResult sr = SharedWorkOptimizer.extractSharedOptimizationInfo( + pctx, optimizerCache, retainableTsOp, discardableTsOp); + + // It seems these two operators can be merged. + // Check that plan meets some preconditions before doing it. + // In particular, in the presence of map joins in the upstream plan: + // - we cannot exceed the noconditional task size, and + // - if we already merged the big table, we cannot merge the broadcast + // tables. + if (!SharedWorkOptimizer.validPreConditions(pctx, optimizerCache, sr)) { + // Skip + LOG.debug("{} and {} do not meet preconditions", retainableTsOp, discardableTsOp); + continue; + } + + // We can merge + if (sr.retainableOps.size() > 1) { + //TODO find the suitable case for this branch + // More than TS operator + Operator lastRetainableOp = sr.retainableOps.get(sr.retainableOps.size() - 1); + Operator lastDiscardableOp = sr.discardableOps.get(sr.discardableOps.size() - 1); + if (lastDiscardableOp.getNumChild() != 0) { + List> allChildren = + Lists.newArrayList(lastDiscardableOp.getChildOperators()); + for (Operator op : allChildren) { + lastDiscardableOp.getChildOperators().remove(op); + op.replaceParent(lastDiscardableOp, lastRetainableOp); + lastRetainableOp.getChildOperators().add(op); + } + } + + LOG.debug("Merging subtree starting at {} into subtree starting at {}", discardableTsOp, retainableTsOp); + } else { + // Only TS operator + ExprNodeGenericFuncDesc exprNode = null; + if (retainableTsOp.getConf().getFilterExpr() != null) { + // Push filter on top of children + SharedWorkOptimizer.pushFilterToTopOfTableScan(optimizerCache, retainableTsOp); + // Clone to push to table scan + exprNode = (ExprNodeGenericFuncDesc) retainableTsOp.getConf().getFilterExpr(); + } + if (discardableTsOp.getConf().getFilterExpr() != null) { + // Push filter on top + SharedWorkOptimizer.pushFilterToTopOfTableScan(optimizerCache, discardableTsOp); + ExprNodeGenericFuncDesc tsExprNode = discardableTsOp.getConf().getFilterExpr(); + if (exprNode != null && !exprNode.isSame(tsExprNode)) { + // We merge filters from previous scan by ORing with filters from current scan + if (exprNode.getGenericUDF() instanceof GenericUDFOPOr) { + List newChildren = new ArrayList<>(exprNode.getChildren().size() + 1); + for (ExprNodeDesc childExprNode : exprNode.getChildren()) { + if (childExprNode.isSame(tsExprNode)) { + // We do not need to do anything, it is in the OR expression + break; + } + newChildren.add(childExprNode); + } + if (exprNode.getChildren().size() == newChildren.size()) { + newChildren.add(tsExprNode); + exprNode = ExprNodeGenericFuncDesc.newInstance( + new GenericUDFOPOr(), + newChildren); + } + } else { + exprNode = ExprNodeGenericFuncDesc.newInstance( + new GenericUDFOPOr(), + Arrays.asList(exprNode, tsExprNode)); + } + } + } + // Replace filter + retainableTsOp.getConf().setFilterExpr(exprNode); + replaceFilterTSMap.put(discardableTsOp, retainableTsOp); + LOG.debug("Merging {} into {}", discardableTsOp, retainableTsOp); + } + + // First we remove the input operators of the expression that + // we are going to eliminate + for (Operator op : sr.discardableInputOps) { + //TODO Verify we need optimizerCache.removeOp(op) + optimizerCache.removeOp(op); + removedOps.add(op); + // Remove DPP predicates + if (op instanceof ReduceSinkOperator) { + SemiJoinBranchInfo sjbi = pctx.getRsToSemiJoinBranchInfo().get(op); + if (sjbi != null && !sr.discardableOps.contains(sjbi.getTsOp()) && + !sr.discardableInputOps.contains(sjbi.getTsOp())) { + //TODO To find similar code in Spark +// GenTezUtils.removeSemiJoinOperator( +// pctx, (ReduceSinkOperator) op, sjbi.getTsOp()); + } + } else if (op instanceof AppMasterEventOperator) { + DynamicPruningEventDesc dped = (DynamicPruningEventDesc) op.getConf(); + if (!sr.discardableOps.contains(dped.getTableScan()) && + !sr.discardableInputOps.contains(dped.getTableScan())) { + //TODO To find similar code in Spark +// GenTezUtils.removeSemiJoinOperator( +// pctx, (AppMasterEventOperator) op, dped.getTableScan()); + } + } + LOG.debug("Input operator removed: {}", op); + } + + removedOps.add(discardableTsOp); + // Finally we remove the expression from the tree + for (Operator op : sr.discardableOps) { + //TODO Verify we need optimizerCache.removeOp(op) + optimizerCache.removeOp(op); + removedOps.add(op); + if (sr.discardableOps.size() == 1) { + // If there is a single discardable operator, it is a TableScanOperator + // and it means that we have merged filter expressions for it. Thus, we + // might need to remove DPP predicates from the retainable TableScanOperator + Collection> c = + optimizerCache.tableScanToDPPSource.get((TableScanOperator) op); + for (Operator dppSource : c) { + if (dppSource instanceof ReduceSinkOperator) { + //TODO To find similar code in Spark +// GenTezUtils.removeSemiJoinOperator(pctx, +// (ReduceSinkOperator) dppSource, +// (TableScanOperator) sr.retainableOps.get(0)); + } else if (dppSource instanceof AppMasterEventOperator) { + //TODO To find similar code in Spark +// GenTezUtils.removeSemiJoinOperator(pctx, +// (AppMasterEventOperator) dppSource, +// (TableScanOperator) sr.retainableOps.get(0)); + } + } + } + LOG.debug("Operator removed: {}", op); + } + + break; + } + + if (removedOps.contains(discardableTsOp)) { + // This operator has been removed, remove it from the list of existing operators + existingOps.remove(tableName, discardableTsOp); + } else { + // This operator has not been removed, include it in the list of existing operators + existingOps.put(tableName, discardableTsOp); + } + } + } + + // Remove unused table scan operators + Iterator> it = topOps.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry e = it.next(); + if (e.getValue().getNumChild() == 0) { + it.remove(); + } + } + + for(TableScanOperator discardableTsOp:replaceFilterTSMap.keySet()){ + TableScanOperator retainedTsOp = replaceFilterTSMap.get(discardableTsOp); + if( retainedTsOp.getConf().getFilterExpr()!= null) { + discardableTsOp.getConf().setFilterExpr( + retainedTsOp.getConf().getFilterExpr()); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("After SharedWorkOptimizer:\n" + Operator.toString(pctx.getTopOps().values())); + } + + return pctx; + } + + private static void gatherDPPTableScanOps( + ParseContext pctx, SharedWorkOptimizerCache optimizerCache) throws SemanticException { + // Find TS operators with partition pruning enabled in plan + // because these TS may potentially read different data for + // different pipeline. + // These can be: + // 1) TS with DPP. + // 2) TS with semijoin DPP. + Map topOps = pctx.getTopOps(); + Collection> tableScanOps = + Lists.>newArrayList(topOps.values()); + Set s = + OperatorUtils.findOperators(tableScanOps, SparkPartitionPruningSinkOperator.class); + for (SparkPartitionPruningSinkOperator a : s) { + if (a.getConf() instanceof SparkPartitionPruningSinkDesc) { + SparkPartitionPruningSinkDesc dped = (SparkPartitionPruningSinkDesc) a.getConf(); + optimizerCache.tableScanToDPPSource.put(dped.getTableScan(), a); + } + } + for (Map.Entry e + : pctx.getRsToSemiJoinBranchInfo().entrySet()) { + optimizerCache.tableScanToDPPSource.put(e.getValue().getTsOp(), e.getKey()); + } + LOG.debug("DPP information stored in the cache: {}", optimizerCache.tableScanToDPPSource); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 604c8ae..b9ad51e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -155,15 +155,19 @@ public MapWork createMapWork(GenSparkProcContext context, Operator root, public MapWork createMapWork(GenSparkProcContext context, Operator root, SparkWork sparkWork, PrunedPartitionList partitions, boolean deferSetup) throws SemanticException { - Preconditions.checkArgument(root.getParentOperators().isEmpty(), - "AssertionError: expected root.getParentOperators() to be empty"); + if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION)) { + Preconditions.checkArgument(root.getParentOperators().isEmpty(), + "AssertionError: expected root.getParentOperators() to be empty"); + } MapWork mapWork = new MapWork("Map " + (++sequenceNumber)); LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root); // map work starts with table scan operators - Preconditions.checkArgument(root instanceof TableScanOperator, - "AssertionError: expected root to be an instance of TableScanOperator, but was " - + root.getClass().getName()); + if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION)) { + Preconditions.checkArgument(root instanceof TableScanOperator, + "AssertionError: expected root to be an instance of TableScanOperator, but was " + + root.getClass().getName()); + } String alias_id = null; if (context.parseContext != null && context.parseContext.getTopOps() != null) { for (String currentAliasID : context.parseContext.getTopOps().keySet()) { @@ -175,9 +179,17 @@ public MapWork createMapWork(GenSparkProcContext context, Operator root, } } if (alias_id == null) - alias_id = ((TableScanOperator) root).getConf().getAlias(); + if( root instanceof TableScanOperator ) { + alias_id = ((TableScanOperator) root).getConf().getAlias(); + }else { + //If context.conf.getBoolVar(HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION)==true, + //the root of operator tree maybe not TableScan , more detailed see HIVE-17486 + alias_id = root.getOperatorId(); + } if (!deferSetup) { - setupMapWork(mapWork, context, partitions,(TableScanOperator) root, alias_id); + if( root instanceof TableScanOperator) { + setupMapWork(mapWork, context, partitions, (TableScanOperator) root, alias_id); + } } // add new item to the Spark work diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java index 5385d5e..d0ae07b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; @@ -39,7 +40,6 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; -import org.apache.hadoop.hive.ql.optimizer.spark.SparkSortMergeJoinFactory; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -60,9 +60,14 @@ */ public class GenSparkWork implements NodeProcessor { static final private Logger LOG = LoggerFactory.getLogger(GenSparkWork.class.getName()); - + //HIVE-17486: TODO we need define the num partition number for the sparkEdgeProperty between Map(TS) and Map( the child of TS-RS) + //More detailed see HIVE-17486 + static final int MAP_PARTITIONS = 1000; // instance of shared utils private GenSparkUtils utils = null; + //HIVE-17486: change M-R to M-M-R + //hasSplitTS: the first M of M-M-R has been dealt with. + private boolean hasSplitTS = false; /** * Constructor takes utils as parameter to facilitate testing @@ -94,6 +99,10 @@ public Object process(Node nd, Stack stack, LOG.debug("Root operator: " + root); LOG.debug("Leaf operator: " + operator); + + if (root == operator) { + hasSplitTS = false; + } SparkWork sparkWork = context.currentTask.getWork(); SMBMapJoinOperator smbOp = GenSparkUtils.getChildOperator(root, SMBMapJoinOperator.class); @@ -119,10 +128,30 @@ public Object process(Node nd, Stack stack, } else { //save work to be initialized later with SMB information. work = utils.createMapWork(context, root, sparkWork, null, true); + context.smbMapJoinCtxMap.get(smbOp).mapWork = (MapWork) work; } } else { - work = utils.createReduceWork(context, root, sparkWork); + //put the operators from the child of TS to reduceSink to another Map + if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION) && !hasSplitTS) { + work = utils.createMapWork(context, root, sparkWork, null); + MapWork mapWork = (MapWork) work; + MapWork preceedingMapWork = (MapWork) context.preceedingWork; + //use the pathToAlias of preceeingMapWork( the first M of M-M-R) to initialize the second M + mapWork.setPathToAliases(preceedingMapWork.getPathToAliases()); + //use the PathToPartitionInfo of preceeingMapWork( the first M of M-M-R) to initialize the second M + mapWork.setPathToPartitionInfo(preceedingMapWork.getPathToPartitionInfo()); + //If context.conf.getBoolVar(HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION)==true, + //use root(the child of TS)'s operatorId as the alias + mapWork.getAliasToWork().put(root.getOperatorId(), root); + SparkEdgeProperty edgeProperty = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE); + edgeProperty.setNumPartitions(MAP_PARTITIONS); + sparkWork.connect(context.preceedingWork, work, edgeProperty); + removeParent(root); + hasSplitTS = true; + }else { + work = utils.createReduceWork(context, root, sparkWork); + } } context.rootToWorkMap.put(root, work); } @@ -202,7 +231,7 @@ public Object process(Node nd, Stack stack, // a few information, since in future we may reach the parent operators via a // different path, and we may need to connect parent works with the work associated // with this root operator. - if (root.getNumParent() > 0) { + if (root.getNumParent() > 0 && !(root.getParentOperators().get(0) instanceof TableScanOperator)) { Preconditions.checkArgument(work instanceof ReduceWork, "AssertionError: expected work to be a ReduceWork, but was " + work.getClass().getName()); ReduceWork reduceWork = (ReduceWork) work; @@ -277,9 +306,11 @@ public Object process(Node nd, Stack stack, // No children means we're at the bottom. If there are more operators to scan // the next item will be a new root. if (!operator.getChildOperators().isEmpty()) { - Preconditions.checkArgument(operator.getChildOperators().size() == 1, - "AssertionError: expected operator.getChildOperators().size() to be 1, but was " - + operator.getChildOperators().size()); + if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION)) { + Preconditions.checkArgument(operator.getChildOperators().size() == 1, + "AssertionError: expected operator.getChildOperators().size() to be 1, but was " + + operator.getChildOperators().size()); + } context.parentOfRoot = operator; context.currentRootOperator = operator.getChildOperators().get(0); context.preceedingWork = work; @@ -287,4 +318,15 @@ public Object process(Node nd, Stack stack, return null; } + + //remove the parent relation for operator + private void removeParent(Operator operator) { + if (operator.getNumParent() > 0) { + Preconditions.checkArgument(operator.getParentOperators().size() == 1, + "AssertionError: expected operator.getParentOperators().size() to be 1, but was " + + operator.getParentOperators().size()); + Operator parent = operator.getParentOperators().get(0); + operator.removeParent(parent); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 965044d..d127b1c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -28,6 +28,9 @@ import java.util.Stack; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.Multimap; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.ConditionalTask; @@ -81,6 +84,7 @@ import org.apache.hadoop.hive.ql.optimizer.spark.SparkJoinOptimizer; import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.optimizer.spark.SparkReduceSinkMapJoinProc; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkSharedWorkOptimizer; import org.apache.hadoop.hive.ql.optimizer.spark.SparkSkewJoinResolver; import org.apache.hadoop.hive.ql.optimizer.spark.SplitSparkWorkResolver; import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics; @@ -138,6 +142,10 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, new ConstantPropagate(ConstantPropagateProcCtx.ConstantPropagateOption.SHORTCUT).transform(pCtx); } + if (procCtx.getParseContext().getConf().getBoolVar(HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION)) { + new SparkSharedWorkOptimizer().transform(procCtx.getParseContext()); + } + PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE); } @@ -414,8 +422,20 @@ private void generateTaskTreeHelper(GenSparkProcContext procCtx, List topN throws SemanticException { // create a walker which walks the tree in a DFS manner while maintaining // the operator stack. The dispatcher generates the plan from the operator tree - Map opRules = new LinkedHashMap(); + HashMap opRules1 = new LinkedHashMap(); + opRules1.put(new RuleRegExp("Handle Analyze Command", + TableScanOperator.getOperatorName() + "%"), + new SparkProcessAnalyzeTable(GenSparkUtils.getUtils())); + Dispatcher disp1 = new DefaultRuleDispatcher(null, opRules1, procCtx); + GraphWalker ogw1 = new GenSparkWorkWalker(disp1, procCtx); + ogw1.startWalking(topNodes, null); + + GenSparkWork genSparkWork = new GenSparkWork(GenSparkUtils.getUtils()); + HashMap opRules = new LinkedHashMap(); + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION)) { + opRules.put(new RuleRegExp("Split work -TableScan", TableScanOperator.getOperatorName() + "%"), genSparkWork); + } opRules.put(new RuleRegExp("Split Work - ReduceSink", ReduceSinkOperator.getOperatorName() + "%"), genSparkWork); @@ -429,9 +449,7 @@ private void generateTaskTreeHelper(GenSparkProcContext procCtx, List topN FileSinkOperator.getOperatorName() + "%"), new CompositeProcessor(new SparkFileSinkProcessor(), genSparkWork)); - opRules.put(new RuleRegExp("Handle Analyze Command", - TableScanOperator.getOperatorName() + "%"), - new SparkProcessAnalyzeTable(GenSparkUtils.getUtils())); + opRules.put(new RuleRegExp("Remember union", UnionOperator.getOperatorName() + "%"), new NodeProcessor() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkRuleDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkRuleDispatcher.java new file mode 100644 index 0000000..c2c8e76 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkRuleDispatcher.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.spark; + +import com.google.common.collect.Multimap; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.Stack; + +public class SparkRuleDispatcher implements Dispatcher { + private final Multimap procRules; + private final NodeProcessorCtx procCtx; + private final Set defaultProcSet; + + /** + * Constructor. + * + * @param defaultProc default processor to be fired if no rule matches + * @param rules operator processor that handles actual processing of the node + * @param procCtx operator processor context, which is opaque to the dispatcher + */ + public SparkRuleDispatcher(NodeProcessor defaultProc, + Multimap rules, NodeProcessorCtx procCtx) { + this.defaultProcSet = new HashSet(); + this.defaultProcSet.add(defaultProc); + procRules = rules; + this.procCtx = procCtx; + } + + /** + * Dispatcher function. + * + * @param nd operator to process + * @param ndStack the operators encountered so far + * @throws SemanticException + */ + @Override + public Object dispatch(Node nd, Stack ndStack, Object... nodeOutputs) + throws SemanticException { + + // find the firing rule + // find the rule from the stack specified + Rule rule = null; + int minCost = Integer.MAX_VALUE; + for (Rule r : procRules.keySet()) { + int cost = r.cost(ndStack); + if ((cost >= 0) && (cost < minCost)) { + minCost = cost; + rule = r; + } + } + + Collection procSet; + + if (rule == null) { + procSet = defaultProcSet; + } else { + procSet = procRules.get(rule); + } + + // Do nothing in case proc is null + Object ret = null; + for (NodeProcessor proc : procSet) { + if (proc != null) { + // Call the process function + ret = proc.process(nd, ndStack, procCtx, nodeOutputs); + } + } + return ret; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index e466b32..eac8c3e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.plan; import org.apache.hadoop.hive.common.StringInternUtils; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import java.util.ArrayList; @@ -27,14 +26,12 @@ import java.util.BitSet; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Properties; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -160,6 +157,9 @@ private String llapIoDesc; private boolean isMergeFromResolver; + //HIVE-17486: Change M-R to M-M-R in Hive on Spark + //skipInitializeFileInputFormat is skip initializing FileInputFormat of second M + private boolean skipInitializeFileInputFormat; public MapWork() {} @@ -867,4 +867,12 @@ public MapExplainVectorization getMapExplainVectorization() { } return new MapExplainVectorization(this); } + + public boolean isSkipInitializeFileInputFormat() { + return skipInitializeFileInputFormat; + } + + public void setSkipInitializeFileInputFormat(boolean skipInitializeFileInputFormat) { + this.skipInitializeFileInputFormat = skipInitializeFileInputFormat; + } } diff --git a/ql/src/test/queries/clientpositive/spark_optimize_shared_work.q b/ql/src/test/queries/clientpositive/spark_optimize_shared_work.q new file mode 100644 index 0000000..fcc89aa --- /dev/null +++ b/ql/src/test/queries/clientpositive/spark_optimize_shared_work.q @@ -0,0 +1,13 @@ +set hive.strict.checks.cartesian.product=false; +set hive.mapred.mode=nonstrict; + +-- SORT_QUERY_RESULTS +-- disable hive.spark.optimize.shared.work +set hive.spark.optimize.shared.work=false; +explain select * from (select avg(key) from src) keyTmp,(select count(value) from src) valueTmp; +select * from (select avg(key) from src) keyTmp,(select count(value) from src) valueTmp; + +-- enable hive.spark.optimize.shared.work +set hive.spark.optimize.shared.work=true; +explain select * from (select avg(key) from src) keyTmp,(select count(value) from src) valueTmp; +select * from (select avg(key) from src) keyTmp,(select count(value) from src) valueTmp; diff --git a/ql/src/test/results/clientpositive/spark/spark_optimize_shared_work.q.out b/ql/src/test/results/clientpositive/spark/spark_optimize_shared_work.q.out new file mode 100644 index 0000000..16c2de3 --- /dev/null +++ b/ql/src/test/results/clientpositive/spark/spark_optimize_shared_work.q.out @@ -0,0 +1,221 @@ +Warning: Shuffle Join JOIN[14][tables = [$hdt$_0, $hdt$_1]] in Work 'Reducer 3' is a cross product +PREHOOK: query: explain select * from (select avg(key) from src) keyTmp,(select count(value) from src) valueTmp +PREHOOK: type: QUERY +POSTHOOK: query: explain select * from (select avg(key) from src) keyTmp,(select count(value) from src) valueTmp +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (GROUP, 1) + Reducer 3 <- Reducer 2 (PARTITION-LEVEL SORT, 1), Reducer 5 (PARTITION-LEVEL SORT, 1) + Reducer 5 <- Map 4 (GROUP, 1) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: key + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: avg(key) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 256 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 256 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: struct) + Map 4 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: value (type: string) + outputColumnNames: value + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count(value) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: avg(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 256 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 256 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: double) + Reducer 3 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 + 1 + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 265 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 265 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +Warning: Shuffle Join JOIN[14][tables = [$hdt$_0, $hdt$_1]] in Work 'Reducer 3' is a cross product +PREHOOK: query: select * from (select avg(key) from src) keyTmp,(select count(value) from src) valueTmp +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select * from (select avg(key) from src) keyTmp,(select count(value) from src) valueTmp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +260.182 500 +Warning: Shuffle Join JOIN[14][tables = [$hdt$_0, $hdt$_1]] in Work 'Reducer 4' is a cross product +PREHOOK: query: explain select * from (select avg(key) from src) keyTmp,(select count(value) from src) valueTmp +PREHOOK: type: QUERY +POSTHOOK: query: explain select * from (select avg(key) from src) keyTmp,(select count(value) from src) valueTmp +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Map 2 <- Map 1 (NONE, 1000) + Map 6 <- Map 1 (NONE, 1000) + Reducer 3 <- Map 2 (GROUP, 1) + Reducer 4 <- Reducer 3 (PARTITION-LEVEL SORT, 1), Reducer 7 (PARTITION-LEVEL SORT, 1) + Reducer 7 <- Map 6 (GROUP, 1) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Map 2 + Map Operator Tree: + Select Operator + expressions: key (type: string) + outputColumnNames: key + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: avg(key) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 256 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 256 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: struct) + Map 6 + Map Operator Tree: + Select Operator + expressions: value (type: string) + outputColumnNames: value + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count(value) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 3 + Reduce Operator Tree: + Group By Operator + aggregations: avg(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 256 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 256 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: double) + Reducer 4 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 + 1 + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 265 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 265 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 7 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +Warning: Shuffle Join JOIN[14][tables = [$hdt$_0, $hdt$_1]] in Work 'Reducer 4' is a cross product +PREHOOK: query: select * from (select avg(key) from src) keyTmp,(select count(value) from src) valueTmp +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select * from (select avg(key) from src) keyTmp,(select count(value) from src) valueTmp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +260.182 500