diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java index d320b47..294fa16 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java @@ -34,6 +34,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.JoinCondDesc; import org.apache.hadoop.hive.ql.plan.JoinDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.Statistics; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; @@ -817,6 +819,25 @@ public void setPosToAliasMap(Map> posToAliasMap) { } @Override + public Statistics getStatistics(HiveConf conf) throws HiveException { + Statistics stats = this.getConf().getStatistics(); + if (stats == null) { + long maxSize = 0; + stats = new Statistics(); + for (Operator parent: this.getParentOperators()) { + long size = parent.getStatistics(conf).getNumberOfBytes(); + if (maxSize < size) { + maxSize = size; + } + } + + stats.setNumberOfBytes(maxSize*this.getParentOperators().size()); + this.getConf().setStatistics(stats); + } + return stats; + } + + @Override public boolean opAllowedBeforeMapJoin() { return false; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java index 24e3d7a..1d6fc77 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java @@ -28,9 +28,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.DemuxDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.Statistics; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.Deserializer; @@ -306,6 +308,16 @@ private void endGroupIfNecessary(int currentChildIndex) throws HiveException { } @Override + public Statistics getStatistics(HiveConf conf) throws HiveException { + Statistics stats = this.getConf().getStatistics(); + if (stats == null) { + super.getStatistics(conf); + stats.setNumberOfBytes(stats.getNumberOfBytes()/this.getParentOperators().size()); + } + return stats; + } + + @Override public void startGroup() throws HiveException { lastChildIndex = 0; super.startGroup(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index ca48f5e..9439243 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -39,6 +40,7 @@ import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.Statistics; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; @@ -1504,6 +1506,25 @@ public boolean opAllowedBeforeSortMergeJoin() { return true; } + /** + * Computes and retrieves the stats for this operator. Default implementation assumes same + * input/output size for operator. + * + * @return Statistics for this operator + */ + public Statistics getStatistics(HiveConf conf) throws HiveException { + Statistics stats = this.getConf().getStatistics(); + + if (stats == null) { + stats = new Statistics(); + for (Operator parent: this.getParentOperators()) { + stats.addNumberOfBytes(parent.getStatistics(conf).getNumberOfBytes()); + } + this.getConf().setStatistics(stats); + } + return stats; + } + @Override public String toString() { return getName() + "[" + getIdentifier() + "]"; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index a5a8943..56b26dd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.exec; -import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; @@ -28,9 +27,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.plan.Statistics; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; @@ -316,6 +317,18 @@ private void publishStats() throws HiveException { } @Override + public Statistics getStatistics(HiveConf conf) throws HiveException { + Statistics stats = this.getConf().getStatistics(); + if (stats == null) { + stats = new Statistics(); + stats.addNumberOfBytes(Utilities.getSize(alias, getConf().getTable(), conf, + this, getConf().getPruningPredicate())); + this.getConf().setStatistics(stats); + } + return stats; + } + + @Override public boolean supportSkewJoinOptimization() { return true; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index f3c34d1..2c4cb56 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -116,6 +116,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; +import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; @@ -2588,22 +2590,30 @@ public static int estimateNumberOfReducers(HiveConf conf, ContentSummary inputSu + maxReducers + " totalInputFileSize=" + totalInputFileSize); } + // If this map reduce job writes final data to a table and bucketing is being inferred, + // and the user has configured Hive to do this, make sure the number of reducers is a + // power of two + boolean powersOfTwo = conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) && + finalMapRed && !work.getBucketedColsByDirectory().isEmpty(); + + return estimateReducers(totalInputFileSize, bytesPerReducer, maxReducers, powersOfTwo); + } + + public static int estimateReducers(long totalInputFileSize, long bytesPerReducer, + int maxReducers, boolean powersOfTwo) { + int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer); reducers = Math.max(1, reducers); reducers = Math.min(maxReducers, reducers); - // If this map reduce job writes final data to a table and bucketing is being inferred, - // and the user has configured Hive to do this, make sure the number of reducers is a - // power of two - if (conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) && - finalMapRed && !work.getBucketedColsByDirectory().isEmpty()) { - int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1; - int reducersPowerTwo = (int)Math.pow(2, reducersLog); + int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1; + int reducersPowerTwo = (int)Math.pow(2, reducersLog); + if (powersOfTwo) { // If the original number of reducers was a power of two, use that if (reducersPowerTwo / 2 == reducers) { - return reducers; + // nothing to do } else if (reducersPowerTwo > maxReducers) { // If the next power of two greater than the original number of reducers is greater // than the max number of reducers, use the preceding power of two, which is strictly @@ -2614,7 +2624,6 @@ public static int estimateNumberOfReducers(HiveConf conf, ContentSummary inputSu reducers = reducersPowerTwo; } } - return reducers; } @@ -2970,5 +2979,58 @@ private static void createTmpDirs(Configuration conf, } } } + + public static long getSize(String alias, Table table, HiveConf conf, + TableScanOperator topOp, ExprNodeDesc expr) throws HiveException { + long result = 0; + int numPartitions = 0; + Map prunedPartitionsMap + = new HashMap(); + + if (!table.isPartitioned()) { + result = getSize(conf, table); + } + else { + // For partitioned tables, get the size of all the partitions + PrunedPartitionList partsList = PartitionPruner.prune(table, expr, conf, + alias, prunedPartitionsMap); + numPartitions = partsList.getNotDeniedPartns().size(); + for (Partition part : partsList.getNotDeniedPartns()) { + result += getSize(conf, part); + } + } + return result; + } + + private static long getSize(HiveConf conf, String size, Path path) { + // If the size is present in the metastore, use it + if (size != null) { + try { + return Long.valueOf(size); + } catch (NumberFormatException e) { + return -1; + } + } + + try { + FileSystem fs = path.getFileSystem(conf); + return fs.getContentSummary(path).getLength(); + } catch (Exception e) { + return -1; + } + } + + private static long getSize(HiveConf conf, Table table) { + Path path = table.getPath(); + String size = table.getProperty("totalSize"); + return getSize(conf, size, path); + } + + private static long getSize(HiveConf conf, Partition partition) { + Path path = partition.getPartitionPath(); + String size = partition.getParameters().get("totalSize"); + + return getSize(conf, size, path); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java index 51464e5..0004a0e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java @@ -125,6 +125,9 @@ protected void addPruningPred(Map opToPrunner, // Put the mapping from table scan operator to pruner_pred opToPrunner.put(top, pruner_pred); + // Set the predicate in the table directly + top.getConf().setPruningPredicate(pruner_pred); + return; } @@ -165,6 +168,9 @@ protected void addPruningPred(Map> // Put the mapping from table scan operator to part-pruner map opToPrunner.put(top, partToPruner); + // Set the predicate in the table directly + top.getConf().setPruningPredicate(pruner_pred); + return; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java index 69af086..bf7e020 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java @@ -154,7 +154,7 @@ public static PrunedPartitionList prune(TableScanOperator ts, ParseContext parse * pruner condition. * @throws HiveException */ - private static PrunedPartitionList prune(Table tab, ExprNodeDesc prunerExpr, + public static PrunedPartitionList prune(Table tab, ExprNodeDesc prunerExpr, HiveConf conf, String alias, Map prunedPartitionsMap) throws HiveException { LOG.trace("Started pruning partiton"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezTaskWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezTaskWalker.java deleted file mode 100644 index d16fd24..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezTaskWalker.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.parse; - -import java.util.List; - -import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; -import org.apache.hadoop.hive.ql.lib.Dispatcher; -import org.apache.hadoop.hive.ql.lib.Node; - -/** - * Walks the operator tree in DFS fashion. - */ -public class GenTezTaskWalker extends DefaultGraphWalker { - - /** - * constructor of the walker - the dispatcher is passed. - * - * @param disp - * the dispatcher to be called for each node visited - */ - public GenTezTaskWalker(Dispatcher disp) { - super(disp); - } - - /** - * Walk the given operator. - * - * @param nd - * operator being walked - */ - @Override - public void walk(Node nd) throws SemanticException { - List children = nd.getChildren(); - - // maintain the stack of operators encountered - opStack.push(nd); - Boolean result = dispatchAndReturn(nd, opStack); - - // move all the children to the front of queue - for (Node ch : children) { - walk(ch); - } - - // done with this operator - opStack.pop(); - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java new file mode 100644 index 0000000..0807d65 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import java.util.Deque; +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; + +/** + * OptimizeTezProcContext. OptimizeTezProcContext maintains information + * about the current operator plan as we walk the operator tree + * to do some additional optimizations on it. + * + */ +public class OptimizeTezProcContext implements NodeProcessorCtx{ + + public final ParseContext parseContext; + public final HiveConf conf; + + public final Set inputs; + public final Set outputs; + + public final Set visitedReduceSinks + = new HashSet(); + + // rootOperators are all the table scan operators in sequence + // of traversal + public final Deque> rootOperators; + + @SuppressWarnings("unchecked") + public OptimizeTezProcContext(HiveConf conf, ParseContext parseContext, + Set inputs, Set outputs, + Deque> rootOperators) { + + this.conf = conf; + this.parseContext = parseContext; + this.inputs = inputs; + this.outputs = outputs; + this.rootOperators = rootOperators; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 0fb449c..a049770 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7756,6 +7756,10 @@ private Operator genTablePlan(String alias, QB qb) throws SemanticException { // Add a mapping from the table scan operator to Table topToTable.put((TableScanOperator) top, tab); + + // set the table in the tablescan descriptor directly + ((TableScanOperator) top).getConf().setTable(tab); + Map props = qb.getTabPropsForAlias(alias); if (props != null) { topToTableProps.put((TableScanOperator) top, props); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SetReducerParallelism.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SetReducerParallelism.java new file mode 100644 index 0000000..51302ce --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SetReducerParallelism.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; + +/** + * SetReducerParallelism determines how many reducers should + * be run for a given reduce sink. + */ +public class SetReducerParallelism implements NodeProcessor { + + static final private Log LOG = LogFactory.getLog(SetReducerParallelism.class.getName()); + + @SuppressWarnings("unchecked") + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procContext, Object... nodeOutputs) + throws SemanticException { + + OptimizeTezProcContext context = (OptimizeTezProcContext) procContext; + + ReduceSinkOperator sink = (ReduceSinkOperator) nd; + ReduceSinkDesc desc = sink.getConf(); + + long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); + int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); + int constantReducers = context.conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS); + + if (context.visitedReduceSinks.contains(sink)) { + // skip walking the children + LOG.debug("Already processed reduce sink: " + sink.getName()); + return true; + } + + context.visitedReduceSinks.add(sink); + + try { + if (desc.getNumReducers() <= 0) { + if (constantReducers > 0) { + LOG.info("Parallelism for reduce sink "+sink+" set by user to "+constantReducers); + desc.setNumReducers(constantReducers); + } else { + long numberOfBytes = 0; + + // we need to add up all the estimates from the siblings of this reduce sink + for (Operator sibling: + sink.getChildOperators().get(0).getParentOperators()) { + numberOfBytes += sibling.getStatistics(context.conf).getNumberOfBytes(); + } + + int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer, + maxReducers, false); + LOG.info("Set parallelism for reduce sink "+sink+" to: "+numReducers); + desc.setNumReducers(numReducers); + } + } else { + LOG.info("Number of reducers determined to be: "+desc.getNumReducers()); + } + } catch (HiveException e) { + throw new SemanticException(e); + } + + return false; + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SetStatistics.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SetStatistics.java new file mode 100644 index 0000000..bb3a4f4 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SetStatistics.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.metadata.HiveException; + +/** + * SetStatistics just invokes getStatistics on FileSinkOperators + * to populate the stats in the operator tree + */ +public class SetStatistics implements NodeProcessor { + + static final private Log LOG = LogFactory.getLog(SetStatistics.class.getName()); + + @SuppressWarnings("unchecked") + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procContext, Object... nodeOutputs) + throws SemanticException { + + OptimizeTezProcContext context = (OptimizeTezProcContext)procContext; + FileSinkOperator sink = (FileSinkOperator) nd; + try { + sink.getStatistics(context.conf); + } catch (HiveException e) { + throw new SemanticException(e); + } + return false; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 5abedfe..7e51310 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -62,13 +62,49 @@ public TezCompiler() { } @Override + protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, + Set outputs) throws SemanticException { + + // Sequence of TableScan operators to be walked + Deque> deque = new LinkedList>(); + deque.addAll(pCtx.getTopOps().values()); + + // Create the context for the walker + OptimizeTezProcContext procCtx + = new OptimizeTezProcContext(conf, pCtx, inputs, outputs, deque); + + // create a walker which walks the tree in a DFS manner while maintaining + // the operator stack. + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp(new String("Set parallelism - ReduceSink"), + ReduceSinkOperator.getOperatorName() + "%"), + new SetReducerParallelism()); + + // if this is an explain statement add rule to generate statistics for + // the whole tree. + if (pCtx.getContext().getExplain()) { + opRules.put(new RuleRegExp(new String("Set statistics - FileSink"), + FileSinkOperator.getOperatorName() + "%"), + new SetStatistics()); + } + + // The dispatcher fires the processor corresponding to the closest matching + // rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); + List topNodes = new ArrayList(); + topNodes.addAll(pCtx.getTopOps().values()); + GraphWalker ogw = new TezWalker(disp); + ogw.startWalking(topNodes, null); + } + + @Override protected void generateTaskTree(List> rootTasks, ParseContext pCtx, List> mvTask, Set inputs, Set outputs) throws SemanticException { - // generate map reduce plans ParseContext tempParseContext = getParseContext(pCtx, rootTasks); + // Sequence of TableScan operators to be walked Deque> deque = new LinkedList>(); deque.addAll(pCtx.getTopOps().values()); @@ -92,7 +128,7 @@ protected void generateTaskTree(List> rootTasks, Pa Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); List topNodes = new ArrayList(); topNodes.addAll(pCtx.getTopOps().values()); - GraphWalker ogw = new GenTezTaskWalker(disp); + GraphWalker ogw = new TezWalker(disp); ogw.startWalking(topNodes, null); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezWalker.java new file mode 100644 index 0000000..2f63c1a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezWalker.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import java.util.List; + +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.Node; + +/** + * Walks the operator tree in DFS fashion. + */ +public class TezWalker extends DefaultGraphWalker { + + /** + * constructor of the walker - the dispatcher is passed. + * + * @param disp + * the dispatcher to be called for each node visited + */ + public TezWalker(Dispatcher disp) { + super(disp); + } + + /** + * Walk the given operator. + * + * @param nd + * operator being walked + */ + @Override + public void walk(Node nd) throws SemanticException { + List children = nd.getChildren(); + + // maintain the stack of operators encountered + opStack.push(nd); + Boolean skip = dispatchAndReturn(nd, opStack); + + if (skip == null || !skip) { + // move all the children to the front of queue + for (Node ch : children) { + walk(ch); + } + } + + // done with this operator + opStack.pop(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java index 4bb28b5..85abb7d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java @@ -19,6 +19,19 @@ package org.apache.hadoop.hive.ql.plan; public class AbstractOperatorDesc implements OperatorDesc { + protected transient Statistics statistics; + + @Override + @Explain(displayName = "Statistics", normalExplain = false) + public Statistics getStatistics() { + return statistics; + } + + @Override + public void setStatistics(Statistics statistics) { + this.statistics = statistics; + } + @Override public Object clone() throws CloneNotSupportedException { throw new CloneNotSupportedException("clone not supported"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java index 36757e8..6c2efaf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java @@ -22,4 +22,6 @@ public interface OperatorDesc extends Serializable, Cloneable { public Object clone() throws CloneNotSupportedException; + public Statistics getStatistics(); + public void setStatistics(Statistics statistics); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java new file mode 100644 index 0000000..302a6d6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; + +/** + * Statistics. Describes the output of an operator in terms of size, rows, etc + * based on estimates. + * + */ +@SuppressWarnings("serial") +public class Statistics implements Serializable { + + // only available stat right now is the amount of data flowing out of an + // operator; + private long numberOfBytes = -1; + + @Explain(displayName = "Estimated data size in bytes") + public long getNumberOfBytes() { + return numberOfBytes; + } + + public void addNumberOfBytes(long numberOfBytes) { + if (this.numberOfBytes < 0) { + this.numberOfBytes = 0; + } + + this.numberOfBytes += numberOfBytes; + } + + public void setNumberOfBytes(long l) { + this.numberOfBytes = l; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index ec2f8f2..1150346 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.ql.exec.PTFUtils; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; /** @@ -52,6 +54,10 @@ */ private int rowLimit = -1; + private transient Table table; + + private transient ExprNodeDesc pruningPredicate; + /** * A boolean variable set to true by the semantic analyzer only in case of the analyze command. * @@ -71,10 +77,32 @@ // input file name (big) to bucket number private Map bucketFileNameMapping; + static{ + PTFUtils.makeTransient(TableScanDesc.class, "pruningPredicate"); + PTFUtils.makeTransient(TableScanDesc.class, "table"); + } + + @SuppressWarnings("nls") public TableScanDesc() { } + public Table getTable() { + return table; + } + + public void setTable(Table t) { + table = t; + } + + public ExprNodeDesc getPruningPredicate() { + return pruningPredicate; + } + + public void setPruningPredicate(ExprNodeDesc expr) { + pruningPredicate = expr; + } + public TableScanDesc(final String alias) { this.alias = alias; }