### Eclipse Workspace Patch 1.0 #P hive-2206-p6 Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (revision 1210283) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (working copy) @@ -70,6 +70,15 @@ public ReduceSinkDesc() { } + private boolean needsOperationPathTagging; + public boolean getNeedsOperationPathTagging() { + return needsOperationPathTagging; + } + + public void setNeedsOperationPathTagging(boolean needsOperationPathTagging) { + this.needsOperationPathTagging = needsOperationPathTagging; + } + public ReduceSinkDesc(java.util.ArrayList keyCols, int numDistributionKeys, java.util.ArrayList valueCols, @@ -78,6 +87,20 @@ java.util.ArrayList outputValueColumnNames, int tag, java.util.ArrayList partitionCols, int numReducers, final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) { + this(keyCols, numDistributionKeys, valueCols, + outputKeyColumnNames, distinctColumnIndices, outputValueColumnNames, tag, + partitionCols, numReducers, keySerializeInfo, valueSerializeInfo, false); + } + + public ReduceSinkDesc(java.util.ArrayList keyCols, + int numDistributionKeys, + java.util.ArrayList valueCols, + java.util.ArrayList outputKeyColumnNames, + List> distinctColumnIndices, + java.util.ArrayList outputValueColumnNames, int tag, + java.util.ArrayList partitionCols, int numReducers, + final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo, + boolean needsOperationPathTagging) { this.keyCols = keyCols; this.numDistributionKeys = numDistributionKeys; this.valueCols = valueCols; @@ -89,6 +112,7 @@ this.keySerializeInfo = keySerializeInfo; this.valueSerializeInfo = valueSerializeInfo; this.distinctColumnIndices = distinctColumnIndices; + this.needsOperationPathTagging = needsOperationPathTagging; } public java.util.ArrayList getOutputKeyColumnNames() { @@ -186,7 +210,7 @@ /** * Returns the sort order of the key columns. - * + * * @return null, which means ascending order for all key columns, or a String * of the same length as key columns, that consists of only "+" * (ascending order) and "-" (descending order). @@ -196,7 +220,7 @@ return keySerializeInfo.getProperties().getProperty( org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER); } - + public void setOrder(String orderStr) { keySerializeInfo.getProperties().setProperty( org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER, Index: ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationFakeReduceSinkDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationFakeReduceSinkDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationFakeReduceSinkDesc.java (revision 0) @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.List; + +/** + * Correlation Fake ReduceSinkDesc. + * + */ +@Explain(displayName = "Correlation Fake Reduce Output Operator") +public class CorrelationFakeReduceSinkDesc implements Serializable { + private static final long serialVersionUID = 1L; + /** + * Key columns are passed to reducer in the "key". + */ + private java.util.ArrayList keyCols; + private java.util.ArrayList outputKeyColumnNames; + private List> distinctColumnIndices; + /** + * Value columns are passed to reducer in the "value". + */ + private java.util.ArrayList valueCols; + private java.util.ArrayList outputValueColumnNames; + /** + * Describe how to serialize the key. + */ + private TableDesc keySerializeInfo; + /** + * Describe how to serialize the value. + */ + private TableDesc valueSerializeInfo; + + /** + * The tag for this reducesink descriptor. + */ + private int tag; + + /** + * Number of distribution keys. + */ + private int numDistributionKeys; + + /** + * The partition columns (CLUSTER BY or DISTRIBUTE BY in Hive language). + * Partition columns decide the reducer that the current row goes to. + * Partition columns are not passed to reducer. + */ + private java.util.ArrayList partitionCols; + + private int numReducers; + + public CorrelationFakeReduceSinkDesc() { + } + + public CorrelationFakeReduceSinkDesc(java.util.ArrayList keyCols, + int numDistributionKeys, + java.util.ArrayList valueCols, + java.util.ArrayList outputKeyColumnNames, + List> distinctColumnIndices, + java.util.ArrayList outputValueColumnNames, int tag, + java.util.ArrayList partitionCols, int numReducers, + final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) { + this.keyCols = keyCols; + this.numDistributionKeys = numDistributionKeys; + this.valueCols = valueCols; + this.outputKeyColumnNames = outputKeyColumnNames; + this.outputValueColumnNames = outputValueColumnNames; + this.tag = tag; + this.numReducers = numReducers; + this.partitionCols = partitionCols; + this.keySerializeInfo = keySerializeInfo; + this.valueSerializeInfo = valueSerializeInfo; + this.distinctColumnIndices = distinctColumnIndices; + } + + public CorrelationFakeReduceSinkDesc(ReduceSinkDesc reduceSinkDesc){ + this.keyCols = reduceSinkDesc.getKeyCols(); + this.numDistributionKeys = reduceSinkDesc.getNumDistributionKeys(); + this.valueCols = reduceSinkDesc.getValueCols(); + this.outputKeyColumnNames = reduceSinkDesc.getOutputKeyColumnNames(); + this.outputValueColumnNames = reduceSinkDesc.getOutputValueColumnNames(); + this.tag = reduceSinkDesc.getTag(); + this.numReducers = reduceSinkDesc.getNumReducers(); + this.partitionCols = reduceSinkDesc.getPartitionCols(); + this.keySerializeInfo = reduceSinkDesc.getKeySerializeInfo(); + this.valueSerializeInfo = reduceSinkDesc.getValueSerializeInfo(); + this.distinctColumnIndices = reduceSinkDesc.getDistinctColumnIndices(); + } + + public java.util.ArrayList getOutputKeyColumnNames() { + return outputKeyColumnNames; + } + + public void setOutputKeyColumnNames( + java.util.ArrayList outputKeyColumnNames) { + this.outputKeyColumnNames = outputKeyColumnNames; + } + + public java.util.ArrayList getOutputValueColumnNames() { + return outputValueColumnNames; + } + + public void setOutputValueColumnNames( + java.util.ArrayList outputValueColumnNames) { + this.outputValueColumnNames = outputValueColumnNames; + } + + @Explain(displayName = "key expressions") + public java.util.ArrayList getKeyCols() { + return keyCols; + } + + public void setKeyCols(final java.util.ArrayList keyCols) { + this.keyCols = keyCols; + } + + public int getNumDistributionKeys() { + return this.numDistributionKeys; + } + + public void setNumDistributionKeys(int numKeys) { + this.numDistributionKeys = numKeys; + } + + @Explain(displayName = "value expressions") + public java.util.ArrayList getValueCols() { + return valueCols; + } + + public void setValueCols(final java.util.ArrayList valueCols) { + this.valueCols = valueCols; + } + + @Explain(displayName = "Map-reduce partition columns") + public java.util.ArrayList getPartitionCols() { + return partitionCols; + } + + public void setPartitionCols( + final java.util.ArrayList partitionCols) { + this.partitionCols = partitionCols; + } + + @Explain(displayName = "tag") + public int getTag() { + return tag; + } + + public void setTag(int tag) { + this.tag = tag; + } + + /** + * Returns the number of reducers for the map-reduce job. -1 means to decide + * the number of reducers at runtime. This enables Hive to estimate the number + * of reducers based on the map-reduce input data size, which is only + * available right before we start the map-reduce job. + */ + public int getNumReducers() { + return numReducers; + } + + public void setNumReducers(int numReducers) { + this.numReducers = numReducers; + } + + public TableDesc getKeySerializeInfo() { + return keySerializeInfo; + } + + public void setKeySerializeInfo(TableDesc keySerializeInfo) { + this.keySerializeInfo = keySerializeInfo; + } + + public TableDesc getValueSerializeInfo() { + return valueSerializeInfo; + } + + public void setValueSerializeInfo(TableDesc valueSerializeInfo) { + this.valueSerializeInfo = valueSerializeInfo; + } + + /** + * Returns the sort order of the key columns. + * + * @return null, which means ascending order for all key columns, or a String + * of the same length as key columns, that consists of only "+" + * (ascending order) and "-" (descending order). + */ + @Explain(displayName = "sort order") + public String getOrder() { + return keySerializeInfo.getProperties().getProperty( + org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER); + } + + public void setOrder(String orderStr) { + keySerializeInfo.getProperties().setProperty( + org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER, + orderStr); + } + + public List> getDistinctColumnIndices() { + return distinctColumnIndices; + } + + public void setDistinctColumnIndices( + List> distinctColumnIndices) { + this.distinctColumnIndices = distinctColumnIndices; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (revision 1210283) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (working copy) @@ -34,8 +34,8 @@ import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; import org.apache.hadoop.hive.ql.plan.SMBJoinDesc; -import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; @@ -580,7 +580,7 @@ } @Override - protected boolean allInitializedParentsAreClosed() { + public boolean allInitializedParentsAreClosed() { return true; } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (revision 1210283) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (working copy) @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.hooks.LineageInfo; @@ -80,7 +81,10 @@ // reducer private Map> groupOpToInputTables; private Map prunedPartitions; - + + // map the implementation of group by operator with the RS-GBY pattern to pattern with map-side + // aggregation enabled like GBY-RS-GBY. This variable is only used by CorrelationOptimizer + Map groupbyRegular2MapSide; /** * The lineage information. */ @@ -157,7 +161,8 @@ HashMap opToSamplePruner, SemanticAnalyzer.GlobalLimitCtx globalLimitCtx, HashMap nameToSplitSample, - HashSet semanticInputs, List> rootTasks) { + HashSet semanticInputs, List> rootTasks, + Map groupbyRegular2MapSide) { this.conf = conf; this.qb = qb; this.ast = ast; @@ -183,6 +188,7 @@ this.globalLimitCtx = globalLimitCtx; this.semanticInputs = semanticInputs; this.rootTasks = rootTasks; + this.groupbyRegular2MapSide = groupbyRegular2MapSide; } /** @@ -529,4 +535,8 @@ this.rootTasks.remove(rootTask); this.rootTasks.addAll(tasks); } + + public Map getGroupbyRegular2MapSide() { + return groupbyRegular2MapSide; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java (revision 0) @@ -0,0 +1,808 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.OpParseContext; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.QB; +import org.apache.hadoop.hive.ql.parse.QBExpr; +import org.apache.hadoop.hive.ql.parse.RowResolver; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +/** + * Implementation of rule-based correlation optimizer. The optimization is based on + * the paper "YSmart: Yet Another SQL-to-MapReduce Translator" + * (http://www.cse.ohio-state.edu/hpcs/WWW/HTML/publications/papers/TR-11-7.pdf). + * This optimizer detects three kinds of + * correlations, Input Correlation (IC), Transit Correlation (TC) and Job Flow Correlation (JFC). + */ + +public class CorrelationOptimizer implements Transform { + + static final private Log LOG = LogFactory.getLog(CorrelationOptimizer.class.getName()); + private final HashMap AliastoTabName; + private final HashMap AliastoTab; + + public CorrelationOptimizer() { + super(); + AliastoTabName = new HashMap(); + AliastoTab = new HashMap(); + pGraphContext = null; + } + + private void initializeAliastoTabNameMapping(QB qb) { + for (String alias: qb.getAliases()) { + AliastoTabName.put(alias, qb.getTabNameForAlias(alias)); + AliastoTab.put(alias, qb.getMetaData().getSrcForAlias(alias)); + } + for (String subqalias: qb.getSubqAliases()) { + QBExpr qbexpr = qb.getSubqForAlias(subqalias); + initializeAliastoTabNameMapping(qbexpr.getQB()); + } + } + + public static ExprNodeColumnDesc getStringColumn(String columnName) { + return new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, columnName, + "", false); + } + + protected ParseContext pGraphContext; + private LinkedHashMap, OpParseContext> opParseCtx; + private final LinkedHashMap, OpParseContext> originalOpParseCtx = + new LinkedHashMap, OpParseContext>(); + private final LinkedHashMap, RowResolver> originalOpRowResolver = + new LinkedHashMap, RowResolver>(); + private final LinkedHashMap, Map> originalOpColumnExprMap = + new LinkedHashMap, Map>(); + + private boolean isPhase1 = true; + + private Map groupbyRegular2MapSide; + + /** + * Transform the query tree. Firstly, find out correlations between operations. + * Then, group these operators in groups + * @param pactx + * current parse context + */ + public ParseContext transform(ParseContext pctx) throws SemanticException { + + if (isPhase1) { + + pGraphContext = pctx; + opParseCtx = pctx.getOpParseCtx(); + + CorrelationNodePhase1ProcCtx correlationCtx = new CorrelationNodePhase1ProcCtx(); + + Map opRules = new LinkedHashMap(); + + Dispatcher disp = new DefaultRuleDispatcher(getPhase1DefaultProc(), opRules, correlationCtx); + GraphWalker ogw = new DefaultGraphWalker(disp); + + // Create a list of topop nodes + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pGraphContext.getTopOps().values()); + ogw.startWalking(topNodes, null); + isPhase1 = false; + + } else { + + /* Types of correlations: + * 1) Input Correlation: Multiple nodes have input correlation + (IC) if their input relation sets are not disjoint; + 2) Transit Correlation: Multiple nodes have transit correlation + (TC) if they have not only input correlation, but + also the same partition key; + 3) Job Flow Correlation: A node has job flow correlation + (JFC) with one of its child nodes if it has the same + partition key as that child node. + * */ + + pGraphContext = pctx; + opParseCtx = pctx.getOpParseCtx(); + + groupbyRegular2MapSide = pctx.getGroupbyRegular2MapSide(); + + initializeAliastoTabNameMapping(pGraphContext.getQB()); + + // 1: find out correlation + CorrelationNodeProcCtx correlationCtx = new CorrelationNodeProcCtx(); + + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", "RS%"), new CorrelationNodeProc()); + + Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, correlationCtx); + GraphWalker ogw = new DefaultGraphWalker(disp); + + // Create a list of topop nodes + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pGraphContext.getTopOps().values()); + ogw.startWalking(topNodes, null); + + // 2: transform the query plan tree + LOG.info("Begain query plan transformation based on intra-query correlations"); + for (IntraQueryCorrelation correlation: correlationCtx.getCorrelations()) { + pGraphContext = CorrelationOptimizerUtils.applyCorrelation( + correlation, pGraphContext, originalOpColumnExprMap, originalOpRowResolver, + groupbyRegular2MapSide, originalOpParseCtx); + } + LOG.info("Finish query plan transformation based on intra-query correlations"); + + } + + return pGraphContext; + } + + + + private NodeProcessor getPhase1DefaultProc() { + return new NodeProcessor() { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { + Operator op = (Operator)nd; + OpParseContext opCtx= opParseCtx.get(op); + + if (op.getColumnExprMap() != null) { + originalOpColumnExprMap.put(op, op.getColumnExprMap()); + } + originalOpParseCtx.put(op, opCtx); + originalOpRowResolver.put(op, opCtx.getRowResolver()); + + return null; + } + }; + } + + private class CorrelationNodeProc implements NodeProcessor { + + public ReduceSinkOperator findNextChildReduceSinkOperator(ReduceSinkOperator rsop) { + Operator op = rsop.getChildOperators().get(0); + while(!op.getName().equals("RS")) { + if (op.getName().equals("FS")) { + return null; + } + assert op.getChildOperators().size() <= 1; + op = op.getChildOperators().get(0); + } + return (ReduceSinkOperator)op; + } + + /** + * Find all peer ReduceSinkOperators (which have the same child operator of op) of op (op included). + */ + private ArrayList findPeerReduceSinkOperators(ReduceSinkOperator op) { + + ArrayList peerReduceSinkOperators = new ArrayList(); + + List> children = op.getChildOperators(); + assert children.size() == 1; + + for (Operator parent: children.get(0).getParentOperators()) { + assert (parent instanceof ReduceSinkOperator); + peerReduceSinkOperators.add((ReduceSinkOperator)parent); + } + + return peerReduceSinkOperators; + } + + private ArrayList findCorrelatedReduceSinkOperators(Operator op, + HashSet keyColumns, IntraQueryCorrelation correlation) throws Exception{ + + LOG.info("now detecting operator " + op.getIdentifier() + " " + op.getName()); + + ArrayList correlatedReduceSinkOps = new ArrayList(); + if (op.getParentOperators() == null) { + return correlatedReduceSinkOps; + } + if (originalOpColumnExprMap.get(op) == null && !(op instanceof ReduceSinkOperator)) { + assert op.getParentOperators().size() == 1; + correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators( + (Operator)op.getParentOperators().get(0), keyColumns, correlation)); + } else if (originalOpColumnExprMap.get(op) != null && !(op instanceof ReduceSinkOperator)) { + HashSet newKeyColumns = new HashSet(); + for (String keyColumn: keyColumns) { + ExprNodeDesc col = (ExprNodeDesc) originalOpColumnExprMap.get(op).get(keyColumn); + if (col instanceof ExprNodeColumnDesc) { + newKeyColumns.add(((ExprNodeColumnDesc)col).getColumn()); + } + } + + if (op.getName().equals("JOIN")) { + HashSet tableNeedToCheck = new HashSet(); + for (String keyColumn: keyColumns) { + for (ColumnInfo cinfo: originalOpParseCtx.get(op).getRowResolver().getColumnInfos()) { + if (keyColumn.equals(cinfo.getInternalName())) { + tableNeedToCheck.add(cinfo.getTabAlias()); + } + } + } + + for (Object parent: op.getParentOperators()) { + assert originalOpParseCtx.get(parent).getRowResolver().getTableNames().size() == 1; + for (String tbl: originalOpParseCtx.get(parent).getRowResolver().getTableNames()) { + if (tableNeedToCheck.contains(tbl)) { + correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators((Operator)parent, newKeyColumns, correlation)); + break; + } + } + } + + } else { + assert op.getParentOperators().size() == 1; + correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators((Operator)op.getParentOperators().get(0), newKeyColumns, correlation)); + } + + } else if (originalOpColumnExprMap.get(op) != null && op instanceof ReduceSinkOperator) { + + HashSet newKeyColumns = new HashSet(); + for (String keyColumn: keyColumns) { + ExprNodeDesc col = (ExprNodeDesc) originalOpColumnExprMap.get(op).get(keyColumn); + if (col instanceof ExprNodeColumnDesc) { + newKeyColumns.add(((ExprNodeColumnDesc)col).getColumn()); + } + } + + ReduceSinkOperator rsop = (ReduceSinkOperator)op; + HashSet thisKeyColumns = new HashSet(); + for (ExprNodeDesc key: rsop.getConf().getKeyCols()) { + if (key instanceof ExprNodeColumnDesc) { + thisKeyColumns.add(((ExprNodeColumnDesc)key).getColumn()); + } + } + + // if the intersection of newKeyColumns and thisKeyColumns is not empty, isCorrelated is true + boolean isCorrelated = false; + // TODO: should use if intersection is empty to evaluate if two corresponding operators are correlated + Set intersection = new HashSet(newKeyColumns); + intersection.retainAll(thisKeyColumns); + isCorrelated = !(intersection.isEmpty()); + + ReduceSinkOperator nextChildReduceSinkOperator = findNextChildReduceSinkOperator(rsop); + + if (isCorrelated) { + if (nextChildReduceSinkOperator.getChildOperators().get(0).getName().equals("JOIN")) { + if (intersection.size() != nextChildReduceSinkOperator.getConf().getKeyCols().size() || + intersection.size() != rsop.getConf().getKeyCols().size()) { + isCorrelated = false; + } + } + } + + if (isCorrelated) { + LOG.info("Operator " + op.getIdentifier() + " " + op.getName() + " is correlated"); + LOG.info("keys of this operator: " + thisKeyColumns.toString()); + LOG.info("keys of child operator: " + keyColumns.toString()); + LOG.info("keys of child operator mapped to this operator:" + newKeyColumns.toString()); + if (((Operator)(op.getChildOperators().get(0))).getName().equals("JOIN")) { + ArrayList peers = findPeerReduceSinkOperators(rsop); + correlatedReduceSinkOps.addAll(peers); + } else { + correlatedReduceSinkOps.add(rsop); + } + if (nextChildReduceSinkOperator.getChildOperators().get(0).getName().equals("GBY") && + (intersection.size() < nextChildReduceSinkOperator.getConf().getKeyCols().size())) { + correlation.addToRSGBYToBeReplacedByGBYRSGBY(nextChildReduceSinkOperator); + } + + } else { + LOG.info("Operator " + op.getIdentifier() + " " + op.getName() + " is not correlated"); + LOG.info("keys of this operator: " + thisKeyColumns.toString()); + LOG.info("keys of child operator: " + keyColumns.toString()); + LOG.info("keys of child operator mapped to this operator:" + newKeyColumns.toString()); + correlatedReduceSinkOps.clear(); + correlation.getRSGBYToBeReplacedByGBYRSGBY().clear(); + } + } else { + throw new Exception("Correlation optimizer: ReduceSinkOperator " + op.getIdentifier() + " does not have ColumnExprMap"); + } + return correlatedReduceSinkOps; + } + + private ArrayList exploitJFC(ReduceSinkOperator op, CorrelationNodeProcCtx correlationCtx, IntraQueryCorrelation correlation) { + + correlationCtx.addWalked(op); + correlation.addToAllReduceSinkOperators(op); + + ArrayList ReduceSinkOperators = new ArrayList(); + + boolean sholdDetect = true; + + ArrayList keys = op.getConf().getKeyCols(); + HashSet keyColumns = new HashSet(); + for (ExprNodeDesc key: keys) { + if (!(key instanceof ExprNodeColumnDesc)) { + sholdDetect = false; + } else { + keyColumns.add(((ExprNodeColumnDesc)key).getColumn()); + } + } + + if (sholdDetect) { + ArrayList newReduceSinkOperators = new ArrayList(); + for (Operator parent: op.getParentOperators()) { + try { + ArrayList correlatedReduceSinkOperators = + findCorrelatedReduceSinkOperators(parent, keyColumns, correlation); + if (correlatedReduceSinkOperators == null || correlatedReduceSinkOperators.size() == 0) { + newReduceSinkOperators.add(op); + } else { + ArrayList deduplicatedCorrelatedReduceSinkOperators = new ArrayList(); + for (ReduceSinkOperator rsop: correlatedReduceSinkOperators) { + if (!deduplicatedCorrelatedReduceSinkOperators.contains(rsop)) { + deduplicatedCorrelatedReduceSinkOperators.add(rsop); + } + } + for (ReduceSinkOperator rsop: deduplicatedCorrelatedReduceSinkOperators) { + if ( !correlation.getUp2downRSops().containsKey(op) ) { + correlation.getUp2downRSops().put(op, new ArrayList()); + } + correlation.getUp2downRSops().get(op).add(rsop); + + if ( !correlation.getDown2upRSops().containsKey(rsop)) { + correlation.getDown2upRSops().put(rsop, new ArrayList()); + } + correlation.getDown2upRSops().get(rsop).add(op); + ArrayList exploited = exploitJFC(rsop, correlationCtx, correlation); + if (exploited == null || exploited.size() == 0) { + newReduceSinkOperators.add(rsop); + } else { + newReduceSinkOperators.addAll(exploited); + } + } + } + + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + ReduceSinkOperators.clear(); + ReduceSinkOperators.addAll(newReduceSinkOperators); + } + return ReduceSinkOperators; + } + + private TableScanOperator findTableScanOPerator(Operator startPoint) { + Operator thisOp = (Operator) startPoint.getParentOperators().get(0); + while(true) { + if (thisOp.getName().equals("RS")) { + return null; + } else if (thisOp.getName().equals("TS")) { + return (TableScanOperator)thisOp; + } + else { + if (thisOp.getParentOperators() != null) { + thisOp = (Operator) thisOp.getParentOperators().get(0); + } + else { + break; + } + } + } + return null; + } + + private void annotateOpPlan(IntraQueryCorrelation correlation) { + HashMap bottomReduceSink2OpPlanMap = new HashMap(); + int count = 0; + + for (ReduceSinkOperator rsop: correlation.getBottomReduceSinkOperators()) { + if (!bottomReduceSink2OpPlanMap.containsKey(rsop)) { + bottomReduceSink2OpPlanMap.put(rsop, count); + for (ReduceSinkOperator peerRSop: findPeerReduceSinkOperators(rsop)) { + if (correlation.getBottomReduceSinkOperators().contains(peerRSop)) { + bottomReduceSink2OpPlanMap.put(peerRSop, count); + } + } + count++; + } + } + correlation.setBottomReduceSink2OperationPathMap(bottomReduceSink2OpPlanMap); + } + + public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, + Object... nodeOutputs) throws SemanticException { + LOG.info("Walk to operator " + ((Operator)nd).getIdentifier() + " " + ((Operator)nd).getName()); + + CorrelationNodeProcCtx correlationCtx = (CorrelationNodeProcCtx)ctx; + + ReduceSinkOperator op = (ReduceSinkOperator)nd; + + if (correlationCtx.isWalked(op)) { + return null; + } + + if (op.getConf().getKeyCols().size() == 0 || + (!op.getChildOperators().get(0).getName().equals("JOIN") && + !op.getChildOperators().get(0).getName().equals("GBY"))) { + correlationCtx.addWalked(op); + return null; + } + + // 1: find out correlation + IntraQueryCorrelation correlation = new IntraQueryCorrelation(); + ArrayList peerReduceSinkOperators = findPeerReduceSinkOperators(op); + ArrayList bottomReduceSinkOperators = new ArrayList(); + for (ReduceSinkOperator rsop: peerReduceSinkOperators) { + + ArrayList thisBottomReduceSinkOperators= exploitJFC(rsop, correlationCtx, correlation); + + if (peerReduceSinkOperators.size() == 1) { + correlation.addToRSGBYToBeReplacedByGBYRSGBY(rsop); + } + if (thisBottomReduceSinkOperators.size() == 0) { + thisBottomReduceSinkOperators.add(rsop); + } else { + boolean isClear = false; + for (ReduceSinkOperator bottomrsop: thisBottomReduceSinkOperators) { + TableScanOperator tsop = findTableScanOPerator(bottomrsop); + if (tsop == null) { + isClear = true; // currently the optimizer can only optimize correlations involving source tables + } else { + if ( !correlation.getTop2TSops().containsKey(rsop) ) { + correlation.getTop2TSops().put(rsop, new ArrayList()); + } + correlation.getTop2TSops().get(rsop).add(tsop); + + if ( !correlation.getBottom2TSops().containsKey(bottomrsop)) { + correlation.getBottom2TSops().put(bottomrsop, new ArrayList()); + } + correlation.getBottom2TSops().get(bottomrsop).add(tsop); + } + } + if (isClear) { + thisBottomReduceSinkOperators.clear(); + thisBottomReduceSinkOperators.add(rsop); + } + } + bottomReduceSinkOperators.addAll(thisBottomReduceSinkOperators); + } + + if (!peerReduceSinkOperators.containsAll(bottomReduceSinkOperators)) { + LOG.info("has job flow correlation"); + correlation.setJobFlowCorrelation(true); + correlation.setJFCCorrelation(peerReduceSinkOperators, bottomReduceSinkOperators); + annotateOpPlan(correlation); + } + + if (correlation.hasJobFlowCorrelation()) { + boolean hasICandTC = findICandTC(correlation); + LOG.info("has input correlation and transit correlation? " + hasICandTC); + correlation.setInputCorrelation(hasICandTC); + correlation.setTransitCorrelation(hasICandTC); + boolean isInvolve = isInvolveSelfJoin(correlation); + LOG.info("involve self-join? " + isInvolve); + correlation.setInvolveSelfJoin(isInvolve); + //TODO: support self-join involved cases. For self-join related operation paths, after the correlation dispatch operator, each path should be filtered by a + // filter operator + if (!isInvolve) { + LOG.info("correlation detected"); + correlationCtx.addCorrelation(correlation); + } else { + LOG.info("correlation discarded. The current optimizer cannot optimize it"); + } + } + + correlationCtx.addWalked(op); + + return null; + } + + private boolean isInvolveSelfJoin(IntraQueryCorrelation correlation) { + boolean isInvolve = false; + for (Entry> entry: correlation.getTable2CorrelatedRSops().entrySet()) { + for (ReduceSinkOperator rsop: entry.getValue()) { + HashSet intersection = new HashSet(findPeerReduceSinkOperators(rsop)); + intersection.retainAll(entry.getValue()); + + // if involve self-join + if (intersection.size() > 1) { + isInvolve = true; + return isInvolve; + } + } + } + return isInvolve; + } + + private boolean findICandTC(IntraQueryCorrelation correlation) { + + boolean hasICandTC = false; + HashMap> table2RSops = new HashMap>(); + HashMap> table2TSops = new HashMap>(); + + for (Entry> entry: correlation.getBottom2TSops().entrySet()) { + String tbl = AliastoTabName.get(entry.getValue().get(0).getConf().getAlias()); + if (!table2RSops.containsKey(tbl) && !table2TSops.containsKey(tbl)) { + table2RSops.put(tbl, new ArrayList()); + table2TSops.put(tbl, new ArrayList()); + } + assert entry.getValue().size() == 1; + table2RSops.get(tbl).add(entry.getKey()); + table2TSops.get(tbl).add(entry.getValue().get(0)); + } + + for (Entry> entry: table2RSops.entrySet()) { + if (entry.getValue().size() > 1) { + hasICandTC = true; + } + } + + + correlation.setICandTCCorrelation(table2RSops, table2TSops); + + return hasICandTC; + } + } + + private NodeProcessor getDefaultProc() { + return new NodeProcessor() { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { + LOG.info("Walk to operator " + ((Operator)nd).getIdentifier() + " " + ((Operator)nd).getName()); + return null; + } + }; + } + + private class CorrelationNodePhase1ProcCtx implements NodeProcessorCtx { + + } + + + public class IntraQueryCorrelation{ + + private final HashMap> down2upRSops = new HashMap>(); + private final HashMap> up2downRSops = new HashMap>(); + + private final HashMap> top2TSops = new HashMap>(); + private final HashMap> bottom2TSops = new HashMap>(); + + private ArrayList topReduceSinkOperators; + private ArrayList bottomReduceSinkOperators; + + private HashMap> table2CorrelatedRSops; + + private HashMap> table2CorrelatedTSops; + + private HashMap bottomReduceSink2OperationPathMap; + + private final HashMap>> dispatchConf = + new HashMap>>(); //inputTag->(Child->outputTag) + private final HashMap>> dispatchValueSelectDescConf = + new HashMap>>(); //inputTag->(Child->SelectDesc) + private final HashMap>> dispatchKeySelectDescConf = + new HashMap>>(); //inputTag->(Child->SelectDesc) + + private final HashSet allReduceSinkOperators = new HashSet(); + + // this set contains all ReduceSink-GroupBy operator-pairs that should be be replaced by GroupBy-ReduceSink-GroupBy pattern. + // the type of first GroupByOperator is hash type and this one will be used to group records. + private final HashSet rSGBYToBeReplacedByGBYRSGBY = new HashSet(); + + public void addToRSGBYToBeReplacedByGBYRSGBY(ReduceSinkOperator rsop) { + rSGBYToBeReplacedByGBYRSGBY.add(rsop); + } + + public HashSet getRSGBYToBeReplacedByGBYRSGBY() { + return rSGBYToBeReplacedByGBYRSGBY; + } + + public void addToAllReduceSinkOperators(ReduceSinkOperator rsop) { + allReduceSinkOperators.add(rsop); + } + + public HashSet getAllReduceSinkOperators() { + return allReduceSinkOperators; + } + + public HashMap>> getDispatchConf() { + return dispatchConf; + } + + public HashMap>> getDispatchValueSelectDescConf() { + return dispatchValueSelectDescConf; + } + + public HashMap>> getDispatchKeySelectDescConf() { + return dispatchKeySelectDescConf; + } + + public void addOperationPathToDispatchConf(Integer opPlan) { + if (!dispatchConf.containsKey(opPlan)) { + dispatchConf.put(opPlan, new HashMap>()); + } + } + + public HashMap> getDispatchConfForOperationPath(Integer opPlan) { + return dispatchConf.get(opPlan); + } + + public void addOperationPathToDispatchValueSelectDescConf(Integer opPlan) { + if (!dispatchValueSelectDescConf.containsKey(opPlan)) { + dispatchValueSelectDescConf.put(opPlan, new HashMap>()); + } + } + + public HashMap> getDispatchValueSelectDescConfForOperationPath(Integer opPlan) { + return dispatchValueSelectDescConf.get(opPlan); + } + + public void addOperationPathToDispatchKeySelectDescConf(Integer opPlan) { + if (!dispatchKeySelectDescConf.containsKey(opPlan)) { + dispatchKeySelectDescConf.put(opPlan, new HashMap>()); + } + } + + public HashMap> getDispatchKeySelectDescConfForOperationPath(Integer opPlan) { + return dispatchKeySelectDescConf.get(opPlan); + } + + private boolean inputCorrelation = false; + private boolean transitCorrelation = false; + private boolean jobFlowCorrelation = false; + + public void setBottomReduceSink2OperationPathMap(HashMap bottomReduceSink2OperationPathMap) { + this.bottomReduceSink2OperationPathMap = bottomReduceSink2OperationPathMap; + } + + public HashMap getBottomReduceSink2OperationPathMap() { + return bottomReduceSink2OperationPathMap; + } + + public void setInputCorrelation(boolean inputCorrelation) { + this.inputCorrelation = inputCorrelation; + } + + public boolean hasInputCorrelation() { + return inputCorrelation; + } + + public void setTransitCorrelation(boolean transitCorrelation) { + this.transitCorrelation = transitCorrelation; + } + + public boolean hasTransitCorrelation() { + return transitCorrelation; + } + + public void setJobFlowCorrelation(boolean jobFlowCorrelation) { + this.jobFlowCorrelation = jobFlowCorrelation; + } + + public boolean hasJobFlowCorrelation() { + return jobFlowCorrelation; + } + + public HashMap> getTop2TSops() { + return top2TSops; + } + + public HashMap> getBottom2TSops() { + return bottom2TSops; + } + + public HashMap> getDown2upRSops() { + return down2upRSops; + } + + public HashMap> getUp2downRSops() { + return up2downRSops; + } + + public void setJFCCorrelation(ArrayList peerReduceSinkOperators, + ArrayList bottomReduceSinkOperators) { + this.topReduceSinkOperators = peerReduceSinkOperators; + this.bottomReduceSinkOperators = bottomReduceSinkOperators; + } + + + public ArrayList getTopReduceSinkOperators() { + return topReduceSinkOperators; + } + + public ArrayList getBottomReduceSinkOperators() { + return bottomReduceSinkOperators; + } + + public void setICandTCCorrelation(HashMap> table2RSops, + HashMap> table2TSops) { + this.table2CorrelatedRSops = table2RSops; + this.table2CorrelatedTSops = table2TSops; + } + + public HashMap> getTable2CorrelatedRSops() { + return table2CorrelatedRSops; + } + + public HashMap> getTable2CorrelatedTSops() { + return table2CorrelatedTSops; + } + + private boolean isInvolveSelfJoin = false; + + public boolean isInvolveSelfJoin() { + return isInvolveSelfJoin; + } + + public void setInvolveSelfJoin(boolean isInvolveSelfJoin) { + this.isInvolveSelfJoin = isInvolveSelfJoin; + } + + } + + private class CorrelationNodeProcCtx implements NodeProcessorCtx { + + private final HashSet walked = new HashSet(); + + private final ArrayList correlations = new ArrayList(); + + public void addCorrelation(IntraQueryCorrelation correlation) { + correlations.add(correlation); + } + + public ArrayList getCorrelations() { + return correlations; + } + + + public boolean isWalked(ReduceSinkOperator op) { + return walked.contains(op); + } + + public void addWalked(ReduceSinkOperator op) { + walked.add(op); + } + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (revision 1210283) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (working copy) @@ -81,6 +81,24 @@ transient protected int numDistributionKeys; transient protected int numDistinctExprs; + + private final ArrayList operationPathTags = new ArrayList(); // operation path tags + private final byte[] operationPathTagsByte = new byte[1]; + + public void setOperationPathTags(ArrayList operationPathTags) { + this.operationPathTags.addAll(operationPathTags); + int operationPathTagsInt = 0; + int tmp = 1; + for (Integer operationPathTag: operationPathTags) { + operationPathTagsInt += tmp << operationPathTag.intValue(); + } + operationPathTagsByte[0] = (byte) operationPathTagsInt; + } + + public ArrayList getOperationPathTags() { + return this.operationPathTags; + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { @@ -267,9 +285,18 @@ keyWritable.set(key.getBytes(), 0, key.getLength()); } else { int keyLength = key.getLength(); - keyWritable.setSize(keyLength + 1); + if (!this.getConf().getNeedsOperationPathTagging()) { + keyWritable.setSize(keyLength + 1); + } else { + keyWritable.setSize(keyLength + 2); + } System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength); - keyWritable.get()[keyLength] = tagByte[0]; + if (!this.getConf().getNeedsOperationPathTagging()) { + keyWritable.get()[keyLength] = tagByte[0]; + } else { + keyWritable.get()[keyLength] = operationPathTagsByte[0]; + keyWritable.get()[keyLength + 1] = tagByte[0]; + } } } else { // Must be BytesWritable @@ -279,9 +306,18 @@ keyWritable.set(key.getBytes(), 0, key.getLength()); } else { int keyLength = key.getLength(); - keyWritable.setSize(keyLength + 1); - System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength); - keyWritable.get()[keyLength] = tagByte[0]; + if (!this.getConf().getNeedsOperationPathTagging()) { + keyWritable.setSize(keyLength + 1); + } else { + keyWritable.setSize(keyLength + 2); + } + System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength); + if (!this.getConf().getNeedsOperationPathTagging()) { + keyWritable.get()[keyLength] = tagByte[0]; + } else { + keyWritable.get()[keyLength] = operationPathTagsByte[0]; + keyWritable.get()[keyLength + 1] = tagByte[0]; + } } } keyWritable.setHashCode(keyHashCode); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationManualForwardOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationManualForwardOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationManualForwardOperator.java (revision 0) @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CorrelationManualForwardDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; + +/** + * Correlation Manual Forward Operator. Collect row and forward it by requests. + * CorrelationManualForwardOperator is a sub-operaotr within CorrelationCompositeOperator. + **/ +public class CorrelationManualForwardOperator extends Operator implements + Serializable { + private static final long serialVersionUID = 1L; + + private Object row; + private int tag; + + public Object getRow() { + return this.row; + } + + public int getTag() { + return this.tag; + } + + @Override + public void processOp(Object row, int tag) throws HiveException { + this.row = row; + this.tag = tag; + } + + public void forwardIt() throws HiveException { + forward(row, inputObjInspectors[tag]); + } + + public void clean() { + this.row = null; + this.tag = -1; + } + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return new String("CMF"); + } + + @Override + public OperatorType getType() { + // TODO Auto-generated method stub + return null; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java (revision 0) @@ -0,0 +1,727 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.CorrelationCompositeOperator; +import org.apache.hadoop.hive.ql.exec.CorrelationFakeReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.CorrelationOptimizer.IntraQueryCorrelation; +import org.apache.hadoop.hive.ql.parse.OpParseContext; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.RowResolver; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; +import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationFakeReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc; +import org.apache.hadoop.hive.ql.plan.ForwardDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; + + +public final class CorrelationOptimizerUtils { + + static final private Log LOG = LogFactory.getLog(CorrelationOptimizerUtils.class.getName()); + + public static boolean isExisted(ExprNodeDesc expr, ArrayList col_list) { + for (ExprNodeDesc thisExpr: col_list) { + if (expr.getExprString().equals(thisExpr.getExprString())) { + return true; + } + } + return false; + } + + public static String getColumnName(Map opColumnExprMap, ExprNodeDesc expr) { + for (Entry entry: opColumnExprMap.entrySet()) { + if (expr.getExprString().equals(entry.getValue().getExprString())) { + return entry.getKey(); + } + } + return null; + } + + + public static Operator unionUsedColumnsAndMakeNewSelect(ArrayList rsops, + IntraQueryCorrelation correlation, LinkedHashMap, + Map> originalOpColumnExprMap, TableScanOperator input, ParseContext pGraphContext, + LinkedHashMap, OpParseContext> originalOpParseCtx) { + + ArrayList columnNames = new ArrayList(); + Map colExprMap = new HashMap(); + ArrayList col_list = new ArrayList(); + RowResolver out_rwsch = new RowResolver(); + boolean isSelectAll = false; + + int pos = 0; + for (ReduceSinkOperator rsop: rsops) { + Operator curr = correlation.getBottom2TSops().get(rsop).get(0).getChildOperators().get(0); + while(true) { + if (curr.getName().equals("SEL")) { + SelectOperator selOp = (SelectOperator)curr; + if (selOp.getColumnExprMap() != null) { + for (Entry entry: selOp.getColumnExprMap().entrySet()) { + ExprNodeDesc expr = entry.getValue(); + if (!isExisted(expr, col_list) && originalOpParseCtx.get(selOp).getRowResolver().getInvRslvMap().containsKey(entry.getKey())) { + col_list.add(expr); + String[] colRef = originalOpParseCtx.get(selOp).getRowResolver().getInvRslvMap().get(entry.getKey()); + String tabAlias = colRef[0]; + String colAlias = colRef[1]; + String outputName = entry.getKey(); + out_rwsch.put(tabAlias, colAlias, new ColumnInfo( + outputName, expr.getTypeInfo(), tabAlias, false)); + pos++; + columnNames.add(outputName); + colExprMap.put(outputName, expr); + } + } + } else { + for (ExprNodeDesc expr: selOp.getConf().getColList()) { + if (!isExisted(expr, col_list)) { + col_list.add(expr); + String[] colRef = pGraphContext.getOpParseCtx().get(selOp).getRowResolver().getInvRslvMap().get(expr.getCols().get(0)); + String tabAlias = colRef[0]; + String colAlias = colRef[1]; + String outputName = expr.getCols().get(0); + out_rwsch.put(tabAlias, colAlias, new ColumnInfo( + outputName, expr.getTypeInfo(), tabAlias, false)); + columnNames.add(outputName); + colExprMap.put(outputName, expr); + pos++; + } + } + } + break; + } else if (curr.getName().equals("FIL")) { + isSelectAll = true; + break; + } else if (curr.getName().equals("RS")) { + ReduceSinkOperator thisRSop = (ReduceSinkOperator)curr; + for (ExprNodeDesc expr: thisRSop.getConf().getKeyCols()) { + if (!isExisted(expr, col_list)) { + col_list.add(expr); + assert expr.getCols().size() == 1; + String columnName = getColumnName(originalOpColumnExprMap.get(thisRSop), expr); + String[] colRef = pGraphContext.getOpParseCtx().get(thisRSop).getRowResolver().getInvRslvMap().get(columnName); + String tabAlias = colRef[0]; + String colAlias = colRef[1]; + String outputName = expr.getCols().get(0); + out_rwsch.put(tabAlias, colAlias, new ColumnInfo( + outputName, expr.getTypeInfo(), tabAlias, false)); + columnNames.add(outputName); + colExprMap.put(outputName, expr); + pos++; + } + } + for (ExprNodeDesc expr: thisRSop.getConf().getValueCols()) { + if (!isExisted(expr, col_list)) { + col_list.add(expr); + assert expr.getCols().size() == 1; + String columnName = getColumnName(originalOpColumnExprMap.get(thisRSop), expr); + String[] colRef = pGraphContext.getOpParseCtx().get(thisRSop).getRowResolver().getInvRslvMap().get(columnName); + String tabAlias = colRef[0]; + String colAlias = colRef[1]; + String outputName = expr.getCols().get(0); + out_rwsch.put(tabAlias, colAlias, new ColumnInfo( + outputName, expr.getTypeInfo(), tabAlias, false)); + columnNames.add(outputName); + colExprMap.put(outputName, expr); + pos++; + } + } + + break; + } else { + curr = curr.getChildOperators().get(0); + } + } + } + + Operator output; + if (isSelectAll) { + output = input; + } else { + output = putOpInsertMap(OperatorFactory.getAndMakeChild( + new SelectDesc(col_list, columnNames, false), new RowSchema( + out_rwsch.getColumnInfos()), input), out_rwsch, pGraphContext.getOpParseCtx()); + output.setColumnExprMap(colExprMap); + output.setChildOperators(Utilities.makeList()); + + } + + return output; + } + + + public static Operator putOpInsertMap(Operator op, + RowResolver rr, LinkedHashMap, OpParseContext> opParseCtx) { + OpParseContext ctx = new OpParseContext(rr); + opParseCtx.put(op, ctx); + op.augmentPlan(); + return op; + } + + public static HashMap, String> getAliasIDtTopOps(HashMap> topOps) { + HashMap, String> aliasIDtTopOps = new HashMap, String>(); + for (Entry> entry: topOps.entrySet()) { + assert !aliasIDtTopOps.containsKey(entry.getValue()); + aliasIDtTopOps.put(entry.getValue(), entry.getKey()); + } + return aliasIDtTopOps; + } + + public static ArrayList findPeerReduceSinkOperators(ReduceSinkOperator op) { + + ArrayList peerReduceSinkOperators = new ArrayList(); + + List> children = op.getChildOperators(); + assert children.size() == 1; + + for (Operator parent: children.get(0).getParentOperators()) { + assert (parent instanceof ReduceSinkOperator); + peerReduceSinkOperators.add((ReduceSinkOperator)parent); + } + + return peerReduceSinkOperators; + } + + public static ArrayList findPeerFakeReduceSinkOperators(CorrelationFakeReduceSinkOperator op) { + + ArrayList peerReduceSinkOperators = new ArrayList(); + + List> children = op.getChildOperators(); + assert children.size() == 1; + + for (Operator parent: children.get(0).getParentOperators()) { + assert (parent instanceof ReduceSinkOperator); + peerReduceSinkOperators.add((CorrelationFakeReduceSinkOperator)parent); + } + + return peerReduceSinkOperators; + } + + // find how many layer's of Fake reduce sink + public static int getPostComputationDepth(IntraQueryCorrelation correlation) { + int depth = 0; + for (ReduceSinkOperator rsop: correlation.getBottomReduceSinkOperators()) { + ReduceSinkOperator op = rsop; + int layer = 0; + while(!correlation.getTopReduceSinkOperators().contains(op)) { + assert correlation.getDown2upRSops().get(op).size() == 1; + op = correlation.getDown2upRSops().get(op).get(0); + layer++; + } + if (layer > depth) { + depth = layer; + } + } + assert depth >= 1; + return depth; + } + + + public static int getPostComputationDepthOfThisPlan(ReduceSinkOperator rsop, IntraQueryCorrelation correlation) { + int depth = 0; + ReduceSinkOperator op = rsop; + while(!correlation.getTopReduceSinkOperators().contains(op)) { + assert correlation.getDown2upRSops().get(op).size() == 1; + op = correlation.getDown2upRSops().get(op).get(0); + depth++; + } + assert depth >= 1; + return depth; + } + + public static ParseContext applyCorrelation(IntraQueryCorrelation correlation, ParseContext inputpGraphContext, + LinkedHashMap, Map> originalOpColumnExprMap, + LinkedHashMap, RowResolver> originalOpRowResolver, + Map groupbyRegular2MapSide, + LinkedHashMap, OpParseContext> originalOpParseCtx) { + + ParseContext pGraphContext = inputpGraphContext; + + // 0: if necessary, replace RS-GBY to GBY-RS-GBY. In GBY-RS-GBY, the first GBY is in type of hash, so it can group records + LOG.info("apply correlation step 0: replace RS-GBY to GBY-RS-GBY"); + for (ReduceSinkOperator rsop: correlation.getRSGBYToBeReplacedByGBYRSGBY()) { + LOG.info("operator " + rsop.getIdentifier() + " should be replaced"); + assert !correlation.getBottomReduceSinkOperators().contains(rsop); + GroupByOperator mapSideGBY = groupbyRegular2MapSide.get(rsop); + assert (mapSideGBY.getChildOperators().get(0).getChildOperators().get(0) instanceof GroupByOperator); + ReduceSinkOperator newRsop = (ReduceSinkOperator)mapSideGBY.getChildOperators().get(0); + GroupByOperator reduceSideGBY = (GroupByOperator)newRsop.getChildOperators().get(0); + GroupByOperator oldReduceSideGBY = (GroupByOperator)rsop.getChildOperators().get(0); + List> parents = rsop.getParentOperators(); + List> children = oldReduceSideGBY.getChildOperators(); + mapSideGBY.setParentOperators(parents); + for (Operator parent: parents) { + parent.replaceChild(rsop, mapSideGBY); + } + reduceSideGBY.setChildOperators(children); + for (Operator child: children) { + child.replaceParent(oldReduceSideGBY, reduceSideGBY); + } + correlation.getAllReduceSinkOperators().remove(rsop); + correlation.getAllReduceSinkOperators().add(newRsop); + } + + + Operator curr; + + // 1: Create table scan operator + LOG.info("apply correlation step 1: create table scan operator"); + HashMap oldTSOP2newTSOP = new HashMap(); + HashMap> oldTopOps = pGraphContext.getTopOps(); + HashMap, String> oldAliasIDtTopOps = getAliasIDtTopOps(oldTopOps); + HashMap oldTopToTable = pGraphContext.getTopToTable(); + HashMap> addedTopOps = new HashMap>(); + HashMap addedTopToTable = new HashMap(); + for (Entry> entry: correlation.getTable2CorrelatedTSops().entrySet()) { + TableScanOperator oldTSop = entry.getValue().get(0); + TableScanDesc tsDesc = new TableScanDesc(oldTSop.getConf().getAlias(), oldTSop.getConf().getVirtualCols()); + OpParseContext opParseCtx= pGraphContext.getOpParseCtx().get(oldTSop); + Operator top = putOpInsertMap(OperatorFactory.get(tsDesc, + new RowSchema(opParseCtx.getRowResolver().getColumnInfos())), + opParseCtx.getRowResolver(), pGraphContext.getOpParseCtx()); + top.setParentOperators(null); + top.setChildOperators(Utilities.makeList()); + for (TableScanOperator tsop: entry.getValue()) { + addedTopOps.put(oldAliasIDtTopOps.get(tsop), top); + addedTopToTable.put((TableScanOperator) top, oldTopToTable.get(tsop)); + oldTSOP2newTSOP.put(tsop, (TableScanOperator)top); + } + } + + int postComputationDepth = getPostComputationDepth(correlation); + ArrayList> childrenOfDispatch = new ArrayList>(); + for (ReduceSinkOperator rsop: correlation.getBottomReduceSinkOperators()) { + int thisPostComputationDepth = getPostComputationDepthOfThisPlan(rsop, correlation); + // TODO: currently, correlation optimizer can not handle the case that + // a table is directly connected to a operator. + assert correlation.getBottomReduceSinkOperators().containsAll(findPeerReduceSinkOperators(rsop)); + Operator op = rsop.getChildOperators().get(0); + if (!childrenOfDispatch.contains(op)) { + LOG.info("Add :" + op.getIdentifier() + " " + op.getName() + " to the children list of dispatch operator"); + childrenOfDispatch.add(op); + } + } + + int opTag = 0; + HashMap operationPath2CorrelationReduceSinkOps = new HashMap(); + for (Entry> entry: correlation.getTable2CorrelatedRSops().entrySet()) { + + // 2: Create select operator for shared op plans + LOG.info("apply correlation step 2: create select operator for shared operation path for the table of " + entry.getKey()); + curr = unionUsedColumnsAndMakeNewSelect(entry.getValue(), correlation, originalOpColumnExprMap, + oldTSOP2newTSOP.get(correlation.getBottom2TSops().get(entry.getValue().get(0)).get(0)), pGraphContext, + originalOpParseCtx); + + // 3: Create CorrelationCompositeOperator, CorrelationReduceSinkOperator + LOG.info("apply correlation step 3: create correlation composite Operator and correlation reduce sink operator for the table of " + entry.getKey()); + curr = createCorrelationCompositeReducesinkOperaotr( + correlation.getTable2CorrelatedTSops().get(entry.getKey()), entry.getValue(), correlation, curr, pGraphContext, + childrenOfDispatch, entry.getKey(), originalOpColumnExprMap, opTag, originalOpRowResolver); + + operationPath2CorrelationReduceSinkOps.put(new Integer(opTag), (ReduceSinkOperator)curr); + opTag++; + } + + + // 4: Create correlation dispatch operator for operation paths + LOG.info("apply correlation step 4: create correlation dispatch operator for operation paths"); + RowResolver outputRS = new RowResolver(); + List> correlationReduceSinkOps = new ArrayList>(); + for (Entry entry: operationPath2CorrelationReduceSinkOps.entrySet()) { + Integer opTagInteger = entry.getKey(); + curr = entry.getValue(); + correlationReduceSinkOps.add((ReduceSinkOperator)curr); + RowResolver inputRS = pGraphContext.getOpParseCtx().get(curr).getRowResolver(); + for (Entry> e1: inputRS.getRslvMap().entrySet()) { + for (Entry e2: e1.getValue().entrySet()) { + outputRS.put(e1.getKey(), e2.getKey(), e2.getValue()); + } + } + } + + Operator dispatchOp = putOpInsertMap(OperatorFactory.get( + new CorrelationReducerDispatchDesc(correlation.getDispatchConf(), correlation.getDispatchKeySelectDescConf(), correlation.getDispatchValueSelectDescConf()), + new RowSchema(outputRS.getColumnInfos())), + outputRS, pGraphContext.getOpParseCtx()); + + dispatchOp.setParentOperators(correlationReduceSinkOps); + for (Operator thisOp: correlationReduceSinkOps) { + thisOp.setChildOperators(Utilities.makeList(dispatchOp)); + } + + // 5: Replace the old plan in the original plan tree with new plan + LOG.info("apply correlation step 5: Replace the old plan in the original plan tree with the new plan"); + HashSet> processed = new HashSet>(); + for (Operator op: childrenOfDispatch) { + ArrayList> parents = new ArrayList>(); + for (Operator oldParent: op.getParentOperators()) { + if (!correlation.getBottomReduceSinkOperators().contains(oldParent)) { + parents.add(oldParent); + } + } + parents.add(dispatchOp); + op.setParentOperators(parents); + } + dispatchOp.setChildOperators(childrenOfDispatch); + HashMap> newTopOps = new HashMap>(); + for (Entry> entry: oldTopOps.entrySet()) { + if (addedTopOps.containsKey(entry.getKey())) { + newTopOps.put(entry.getKey(), addedTopOps.get(entry.getKey())); + } else { + newTopOps.put(entry.getKey(), entry.getValue()); + } + } + pGraphContext.setTopOps(newTopOps); + HashMap newTopToTable = new HashMap(); + for (Entry entry: oldTopToTable.entrySet()) { + if (addedTopToTable.containsKey(oldTSOP2newTSOP.get(entry.getKey()))) { + newTopToTable.put(oldTSOP2newTSOP.get(entry.getKey()), + addedTopToTable.get(oldTSOP2newTSOP.get(entry.getKey()))); + } else { + newTopToTable.put(entry.getKey(), entry.getValue()); + } + } + pGraphContext.setTopToTable(newTopToTable); + + // 6: Change each JFC related ReduceSinkOperator to a CorrelationFakeReduceSinkOperator + LOG.info("apply correlation step 6: Change each JFC related reduce sink operator to a correlation fake reduce sink operator"); + HashMap, ArrayList>> newParentsOfChildren = + new HashMap, ArrayList>>(); + for (ReduceSinkOperator rsop: correlation.getAllReduceSinkOperators()) { + if (!correlation.getBottomReduceSinkOperators().contains(rsop)) { + Operator childOP = rsop.getChildOperators().get(0); + Operator parentOP = rsop.getParentOperators().get(0); + Operator correlationFakeReduceSinkOperator = putOpInsertMap(OperatorFactory.get( + new CorrelationFakeReduceSinkDesc(rsop.getConf()), + new RowSchema(pGraphContext.getOpParseCtx().get(rsop).getRowResolver().getColumnInfos())), + pGraphContext.getOpParseCtx().get(rsop).getRowResolver(), pGraphContext.getOpParseCtx()); + correlationFakeReduceSinkOperator.setChildOperators(Utilities.makeList(childOP)); + correlationFakeReduceSinkOperator.setParentOperators(Utilities.makeList(parentOP)); + parentOP.getChildOperators().set(parentOP.getChildOperators().indexOf(rsop), correlationFakeReduceSinkOperator); + childOP.getParentOperators().set(childOP.getParentOperators().indexOf(rsop), correlationFakeReduceSinkOperator); + } + } + + return pGraphContext; + } + + public static Operator createCorrelationCompositeReducesinkOperaotr( + ArrayList tsops, ArrayList rsops, + IntraQueryCorrelation correlation, + Operator input, ParseContext pGraphContext, + ArrayList> childrenOfDispatch, String tableName, + LinkedHashMap, Map> originalOpColumnExprMap, int newTag, + LinkedHashMap, RowResolver> originalOpRowResolver) { + + // Create CorrelationCompositeOperator + RowResolver inputRR = pGraphContext.getOpParseCtx().get(input).getRowResolver(); + ArrayList> tops = new ArrayList>(); + ArrayList> bottoms = new ArrayList>(); + ArrayList opTags = new ArrayList(); + + for (ReduceSinkOperator rsop: rsops) { + TableScanOperator tsop = correlation.getBottom2TSops().get(rsop).get(0); + Operator curr = tsop.getChildOperators().get(0); + if (curr == rsop) { + // no filter needed, just forward + ForwardDesc forwardCtx = new ForwardDesc(); + Operator forwardOp = OperatorFactory.get(ForwardDesc.class); + forwardOp.setConf(forwardCtx); + tops.add(forwardOp); + bottoms.add(forwardOp); + opTags.add(correlation.getBottomReduceSink2OperationPathMap().get(rsop)); + } else { + // Add filter operator + FilterOperator currFilOp = null; + while(curr != rsop) { + if (curr.getName().equals("FIL")) { + FilterOperator fil = (FilterOperator)curr; + FilterDesc filterCtx = new FilterDesc(fil.getConf().getPredicate(), false); + Operator nowFilOp = OperatorFactory.get(FilterDesc.class); + nowFilOp.setConf(filterCtx); + if (currFilOp == null) { + currFilOp = (FilterOperator)nowFilOp; + tops.add(currFilOp); + } else { + nowFilOp.setParentOperators(Utilities.makeList(currFilOp)); + currFilOp.setChildOperators(Utilities.makeList(nowFilOp)); + currFilOp = (FilterOperator) nowFilOp; + } + } + curr = curr.getChildOperators().get(0); + } + if (currFilOp == null) { + ForwardDesc forwardCtx = new ForwardDesc(); + Operator forwardOp = OperatorFactory.get(ForwardDesc.class); + forwardOp.setConf(forwardCtx); + tops.add(forwardOp); + bottoms.add(forwardOp); + } else { + bottoms.add(currFilOp); + } + opTags.add(correlation.getBottomReduceSink2OperationPathMap().get(rsop)); + + } + } + + int[] opTagsArray = new int[opTags.size()]; + for (int i=0; i ycop = putOpInsertMap(OperatorFactory.getAndMakeChild(ycoCtx, + new RowSchema(inputRR.getColumnInfos()), input), + inputRR, pGraphContext.getOpParseCtx()); + + // Create CorrelationReduceSinkOperator + ArrayList partitionCols = new ArrayList(); + ArrayList keyCols = new ArrayList(); + Map colExprMap = new HashMap(); + ArrayList keyOutputColumnNames = new ArrayList(); + ReduceSinkOperator firstRsop = rsops.get(0); + + RowResolver firstRsopRS = pGraphContext.getOpParseCtx().get(firstRsop).getRowResolver(); + RowResolver orginalFirstRsopRS = originalOpRowResolver.get(firstRsop); + RowResolver outputRS = new RowResolver(); + HashMap keyCol2ExprForDispatch = new HashMap(); + HashMap valueCol2ExprForDispatch = new HashMap(); + + for (ExprNodeDesc expr: firstRsop.getConf().getKeyCols()) { + assert expr instanceof ExprNodeColumnDesc; + ExprNodeColumnDesc encd = (ExprNodeColumnDesc)expr; + String ouputName = getColumnName(originalOpColumnExprMap.get(firstRsop), expr); + ColumnInfo cinfo = orginalFirstRsopRS.getColumnInfos().get(orginalFirstRsopRS.getPosition(ouputName)); + + String col = SemanticAnalyzer.getColumnInternalName(keyCols.size()); + keyOutputColumnNames.add(col); + ColumnInfo newColInfo = new ColumnInfo(col, cinfo.getType(), tableName, cinfo + .getIsVirtualCol(), cinfo.isHiddenVirtualCol()); + + colExprMap.put(newColInfo.getInternalName(), expr); + + outputRS.put(tableName, newColInfo.getInternalName(), newColInfo); + keyCols.add(expr); + + keyCol2ExprForDispatch.put(encd.getColumn(), new ExprNodeColumnDesc(cinfo.getType(), col, tableName, + encd.getIsPartitionColOrVirtualCol())); + + } + + ArrayList valueCols = new ArrayList(); + ArrayList valueOutputColumnNames = new ArrayList(); + + correlation.addOperationPathToDispatchConf(newTag); + correlation.addOperationPathToDispatchKeySelectDescConf(newTag); + correlation.addOperationPathToDispatchValueSelectDescConf(newTag); + + + for (ReduceSinkOperator rsop: rsops) { + RowResolver rs = pGraphContext.getOpParseCtx().get(rsop).getRowResolver(); + RowResolver orginalRS = originalOpRowResolver.get(rsop); + Integer childOpIndex = childrenOfDispatch.indexOf(rsop.getChildOperators().get(0)); + int outputTag = rsop.getConf().getTag(); + if (outputTag == -1) { + outputTag = 0; + } + if (!correlation.getDispatchConfForOperationPath(newTag).containsKey(childOpIndex)) { + correlation.getDispatchConfForOperationPath(newTag).put(childOpIndex, new ArrayList()); + } + correlation.getDispatchConfForOperationPath(newTag).get(childOpIndex).add(outputTag); + + ArrayList thisKeyColsInDispatch = new ArrayList(); + ArrayList outputKeyNamesInDispatch = new ArrayList(); + for (ExprNodeDesc expr: rsop.getConf().getKeyCols()) { + assert expr instanceof ExprNodeColumnDesc; + ExprNodeColumnDesc encd = (ExprNodeColumnDesc)expr; + String outputName = getColumnName(originalOpColumnExprMap.get(rsop), expr); + thisKeyColsInDispatch.add(keyCol2ExprForDispatch.get(encd.getColumn())); + String[] names = outputName.split("\\."); + outputKeyNamesInDispatch.add(names[1]); + } + + if (!correlation.getDispatchKeySelectDescConfForOperationPath(newTag).containsKey(childOpIndex)) { + correlation.getDispatchKeySelectDescConfForOperationPath(newTag).put(childOpIndex, new ArrayList()); + } + correlation.getDispatchKeySelectDescConfForOperationPath(newTag).get(childOpIndex). + add(new SelectDesc(thisKeyColsInDispatch, outputKeyNamesInDispatch, false)); + + ArrayList thisValueColsInDispatch = new ArrayList(); + ArrayList outputValueNamesInDispatch = new ArrayList(); + for (ExprNodeDesc expr: rsop.getConf().getValueCols()) { + + String outputName = getColumnName(originalOpColumnExprMap.get(rsop), expr); + ColumnInfo cinfo = orginalRS.getColumnInfos().get(orginalRS.getPosition(outputName)); + if (!valueCol2ExprForDispatch.containsKey(expr.getExprString())) { + + String col = SemanticAnalyzer.getColumnInternalName(keyCols.size() + valueCols.size()); + valueOutputColumnNames.add(col); + ColumnInfo newColInfo = new ColumnInfo(col, cinfo.getType(), tableName, cinfo + .getIsVirtualCol(), cinfo.isHiddenVirtualCol()); + colExprMap.put(newColInfo.getInternalName(), expr); + outputRS.put(tableName, newColInfo.getInternalName(), newColInfo); + valueCols.add(expr); + + valueCol2ExprForDispatch.put(expr.getExprString(), new ExprNodeColumnDesc(cinfo.getType(), col, tableName, + false)); + } + + thisValueColsInDispatch.add(valueCol2ExprForDispatch.get(expr.getExprString())); + String[] names = outputName.split("\\."); + outputValueNamesInDispatch.add(names[1]); + } + + if (!correlation.getDispatchValueSelectDescConfForOperationPath(newTag).containsKey(childOpIndex)) { + correlation.getDispatchValueSelectDescConfForOperationPath(newTag).put(childOpIndex, new ArrayList()); + } + correlation.getDispatchValueSelectDescConfForOperationPath(newTag).get(childOpIndex). + add(new SelectDesc(thisValueColsInDispatch, outputValueNamesInDispatch, false)); + } + + ReduceSinkOperator rsOp = null; + try { + rsOp = (ReduceSinkOperator) putOpInsertMap( + OperatorFactory.getAndMakeChild(getReduceSinkDesc(keyCols, + keyCols.size(), valueCols, new ArrayList>(), + keyOutputColumnNames, valueOutputColumnNames, true, newTag, keyCols.size(), + -1), new RowSchema(outputRS + .getColumnInfos()), ycop), outputRS, pGraphContext.getOpParseCtx()); + rsOp.setColumnExprMap(colExprMap); + ((CorrelationCompositeOperator)ycop).getConf().setCorrespondingReduceSinkOperator(rsOp); + } catch (SemanticException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + return rsOp; + } + + + /** + * Create the correlation reduce sink descriptor. + * + * @param keyCols + * The columns to be stored in the key + * @param numKeys number of distribution keys. Equals to group-by-key + * numbers usually. + * @param valueCols + * The columns to be stored in the value + * @param distinctColIndices + * column indices for distinct aggregates + * @param outputKeyColumnNames + * The output key columns names + * @param outputValueColumnNames + * The output value columns names + * @param tag + * The tag for this reducesink + * @param numPartitionFields + * The first numPartitionFields of keyCols will be partition columns. + * If numPartitionFields=-1, then partition randomly. + * @param numReducers + * The number of reducers, set to -1 for automatic inference based on + * input data size. + * @return The YSmartReduceSinkDesc object. + */ + public static ReduceSinkDesc getReduceSinkDesc( + ArrayList keyCols, int numKeys, + ArrayList valueCols, + List> distinctColIndices, + List outputKeyColumnNames, List outputValueColumnNames, + boolean includeKey, int tag, + int numPartitionFields, int numReducers) throws SemanticException { + ArrayList partitionCols = null; + + if (numPartitionFields >= keyCols.size()) { + partitionCols = keyCols; + } else if (numPartitionFields >= 0) { + partitionCols = new ArrayList(numPartitionFields); + for (int i = 0; i < numPartitionFields; i++) { + partitionCols.add(keyCols.get(i)); + } + } else { + // numPartitionFields = -1 means random partitioning + partitionCols = new ArrayList(1); + partitionCols.add(TypeCheckProcFactory.DefaultExprProcessor + .getFuncExprNodeDesc("rand")); + } + + StringBuilder order = new StringBuilder(); + for (int i = 0; i < keyCols.size(); i++) { + order.append("+"); + } + + TableDesc keyTable = null; + TableDesc valueTable = null; + ArrayList outputKeyCols = new ArrayList(); + ArrayList outputValCols = new ArrayList(); + if (includeKey) { + keyTable = PlanUtils.getReduceKeyTableDesc(PlanUtils.getFieldSchemasFromColumnListWithLength( + keyCols, distinctColIndices, outputKeyColumnNames, numKeys, ""), + order.toString()); + outputKeyCols.addAll(outputKeyColumnNames); + } else { + keyTable = PlanUtils.getReduceKeyTableDesc(PlanUtils.getFieldSchemasFromColumnList( + keyCols, "reducesinkkey"),order.toString()); + for (int i = 0; i < keyCols.size(); i++) { + outputKeyCols.add("reducesinkkey" + i); + } + } + valueTable = PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromColumnList( + valueCols, outputValueColumnNames, 0, "")); + outputValCols.addAll(outputValueColumnNames); + + return new ReduceSinkDesc(keyCols, numKeys, valueCols, outputKeyCols, + distinctColIndices, outputValCols, + tag, partitionCols, numReducers, keyTable, + valueTable, true); + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (revision 1210283) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (working copy) @@ -61,6 +61,7 @@ private Reporter rp; private boolean abort = false; private boolean isTagged = false; + private boolean isOperationPathTagged = false; //If operation path is tagged private long cntr = 0; private long nextCntr = 1; @@ -116,6 +117,7 @@ reducer.setParentOperators(null); // clear out any parents as reducer is the // root isTagged = gWork.getNeedsTagging(); + isOperationPathTagged = gWork.getNeedsOperationPathTagging(); try { keyTableDesc = gWork.getKeyDesc(); inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc @@ -164,8 +166,9 @@ private BytesWritable groupKey; - ArrayList row = new ArrayList(3); + ArrayList row = new ArrayList(4); ByteWritable tag = new ByteWritable(); + ByteWritable operationPathTags = new ByteWritable(); public void reduce(Object key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { @@ -188,6 +191,14 @@ keyWritable.setSize(size); } + operationPathTags.set((byte)0); + if (isOperationPathTagged) { + // remove the operation plan tag + int size = keyWritable.getSize() - 1; + operationPathTags.set(keyWritable.get()[size]); + keyWritable.setSize(size); + } + if (!keyWritable.equals(groupKey)) { // If a operator wants to do some work at the beginning of a group if (groupKey == null) { // the first group @@ -234,6 +245,7 @@ row.add(valueObject[tag.get()]); // The tag is not used any more, we should remove it. row.add(tag); + row.add(operationPathTags); if (isLogInfoEnabled) { cntr++; if (cntr == nextCntr) { Index: ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationReducerDispatchDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationReducerDispatchDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationReducerDispatchDesc.java (revision 0) @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map.Entry; + + +/** + * Correlation dispatch operator Descriptor implementation. + * + */ +@Explain(displayName = "Correlation Dispatch Operator") +public class CorrelationReducerDispatchDesc implements Serializable { + + private static final long serialVersionUID = 1L; + + + private HashMap>> dispatchConf; + private HashMap>> dispatchValueSelectDescConf; + private HashMap>> dispatchKeySelectDescConf; + + public CorrelationReducerDispatchDesc(){ + this.dispatchConf = new HashMap>>(); + this.dispatchValueSelectDescConf = new HashMap>>(); + this.dispatchKeySelectDescConf = new HashMap>>(); + + } + + public CorrelationReducerDispatchDesc(HashMap>> dispatchConf){ + this.dispatchConf = dispatchConf; + this.dispatchValueSelectDescConf = new HashMap>>(); + this.dispatchKeySelectDescConf = new HashMap>>(); + for(Entry>> entry: this.dispatchConf.entrySet()){ + HashMap> tmp = new HashMap>(); + for(Integer child: entry.getValue().keySet()){ + tmp.put(child, new ArrayList()); + tmp.get(child).add(new SelectDesc(true)); + } + this.dispatchValueSelectDescConf.put(entry.getKey(), tmp); + this.dispatchKeySelectDescConf.put(entry.getKey(), tmp); + } + } + + public CorrelationReducerDispatchDesc(HashMap>> dispatchConf, + HashMap>> dispatchKeySelectDescConf, + HashMap>> dispatchValueSelectDescConf){ + this.dispatchConf = dispatchConf; + this.dispatchValueSelectDescConf = dispatchValueSelectDescConf; + this.dispatchKeySelectDescConf = dispatchKeySelectDescConf; + } + + public void setDispatchConf(HashMap>> dispatchConf){ + this.dispatchConf = dispatchConf; + } + + public HashMap>> getDispatchConf(){ + return this.dispatchConf; + } + + public void setDispatchValueSelectDescConf(HashMap>> dispatchValueSelectDescConf){ + this.dispatchValueSelectDescConf = dispatchValueSelectDescConf; + } + + public HashMap>> getDispatchValueSelectDescConf(){ + return this.dispatchValueSelectDescConf; + } + + public void setDispatchKeySelectDescConf(HashMap>> dispatchKeySelectDescConf){ + this.dispatchKeySelectDescConf = dispatchKeySelectDescConf; + } + + public HashMap>> getDispatchKeySelectDescConf() { + return this.dispatchKeySelectDescConf; + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1210283) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.QueryProperties; import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExecDriver; @@ -58,7 +59,6 @@ import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapRedTask; -import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.RecordReader; @@ -184,7 +184,7 @@ private List loadTableWork; private List loadFileWork; private Map joinContext; - private final HashMap topToTable; + private HashMap topToTable; private QB qb; private ASTNode ast; private int destTableId; @@ -205,6 +205,9 @@ private final UnparseTranslator unparseTranslator; private final GlobalLimitCtx globalLimitCtx = new GlobalLimitCtx(); + // store the variable ParseCOntext.groupbyRegular2MapSide + Map groupbyRegular2MapSide; + //prefix for column names auto generated by hive private final String autogenColAliasPrfxLbl; private final boolean autogenColAliasPrfxIncludeFuncName; @@ -285,6 +288,7 @@ autogenColAliasPrfxIncludeFuncName = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTOGEN_COLUMNALIAS_PREFIX_INCLUDEFUNCNAME); queryProperties = new QueryProperties(); + groupbyRegular2MapSide = new HashMap(); } @Override @@ -303,6 +307,8 @@ opParseCtx.clear(); groupOpToInputTables.clear(); prunedPartitions.clear(); + topToTable.clear(); + groupbyRegular2MapSide.clear(); } public void init(ParseContext pctx) { @@ -310,6 +316,7 @@ opToPartList = pctx.getOpToPartList(); opToSamplePruner = pctx.getOpToSamplePruner(); topOps = pctx.getTopOps(); + topToTable = pctx.getTopToTable(); topSelOps = pctx.getTopSelOps(); opParseCtx = pctx.getOpParseCtx(); loadTableWork = pctx.getLoadTableWork(); @@ -324,6 +331,7 @@ groupOpToInputTables = pctx.getGroupOpToInputTables(); prunedPartitions = pctx.getPrunedPartitions(); setLineageInfo(pctx.getLineageInfo()); + groupbyRegular2MapSide = pctx.getGroupbyRegular2MapSide(); } public ParseContext getParseContext() { @@ -331,7 +339,8 @@ topSelOps, opParseCtx, joinContext, topToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, - opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks); + opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, + groupbyRegular2MapSide); } @SuppressWarnings("nls") @@ -2908,6 +2917,7 @@ ColumnInfo colInfo = new ColumnInfo(field, expr.getTypeInfo(), null, false); reduceSinkOutputRowResolver.putExpression(parameter, colInfo); numExprs++; + colExprMap.put(colInfo.getInternalName(), expr); } distinctColIndices.add(distinctIndices); } @@ -2925,15 +2935,18 @@ for (int i = 1; i < value.getChildCount(); i++) { ASTNode parameter = (ASTNode) value.getChild(i); if (reduceSinkOutputRowResolver.getExpression(parameter) == null) { - reduceValues.add(genExprNodeDesc(parameter, - reduceSinkInputRowResolver)); + ExprNodeDesc expr = genExprNodeDesc(parameter, + reduceSinkInputRowResolver); + reduceValues.add(expr); outputValueColumnNames .add(getColumnInternalName(reduceValues.size() - 1)); String field = Utilities.ReduceField.VALUE.toString() + "." + getColumnInternalName(reduceValues.size() - 1); - reduceSinkOutputRowResolver.putExpression(parameter, new ColumnInfo(field, + ColumnInfo colInfo = new ColumnInfo(field, reduceValues.get(reduceValues.size() - 1).getTypeInfo(), null, - false)); + false); + reduceSinkOutputRowResolver.putExpression(parameter, colInfo); + colExprMap.put(colInfo.getInternalName(),expr); } } } @@ -2945,14 +2958,18 @@ TypeInfo type = reduceSinkInputRowResolver.getColumnInfos().get( inputField).getType(); - reduceValues.add(new ExprNodeColumnDesc(type, - getColumnInternalName(inputField), "", false)); + ExprNodeDesc expr = new ExprNodeColumnDesc(type, + getColumnInternalName(inputField), "", false); + reduceValues.add(expr); inputField++; outputValueColumnNames.add(getColumnInternalName(reduceValues.size() - 1)); String field = Utilities.ReduceField.VALUE.toString() + "." + getColumnInternalName(reduceValues.size() - 1); + ColumnInfo colInfo = new ColumnInfo(field, + type, null, false); reduceSinkOutputRowResolver.putExpression(entry.getValue(), - new ColumnInfo(field, type, null, false)); + colInfo); + colExprMap.put(colInfo.getInternalName(),expr); } } @@ -5897,7 +5914,8 @@ // insert a select operator here used by the ColumnPruner to reduce // the data to shuffle curr = insertSelectAllPlanForGroupBy(dest, curr); - if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) { + if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE) && + !conf.getBoolVar(HiveConf.ConfVars.HIVEOPTCORRELATION)) { if (!conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) { curr = genGroupByPlanMapAggr1MR(dest, qb, curr); } else { @@ -5906,7 +5924,19 @@ } else if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) { curr = genGroupByPlan2MR(dest, qb, curr); } else { + Operator mapSide = null; + if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE) && + conf.getBoolVar(HiveConf.ConfVars.HIVEOPTCORRELATION)) { + mapSide = genGroupByPlanMapAggr1MR(dest, qb, curr); + mapSide = (Operator)((Operator)mapSide.getParentOperators().get(0)).getParentOperators().get(0); + curr.getChildOperators().remove(mapSide); + } curr = genGroupByPlan1MR(dest, qb, curr); + if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE) && + conf.getBoolVar(HiveConf.ConfVars.HIVEOPTCORRELATION)) { + groupbyRegular2MapSide.put((ReduceSinkOperator )curr.getParentOperators().get(0), + (GroupByOperator)mapSide); + } } } @@ -7298,7 +7328,8 @@ opToPartList, topOps, topSelOps, opParseCtx, joinContext, topToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, - opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks); + opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, + groupbyRegular2MapSide); Optimizer optm = new Optimizer(); optm.setPctx(pCtx); Index: ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java (revision 0) @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.CorrelationManualForwardOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; + + +/** + * Correlation composite operator Descriptor implementation. + * + */ +@Explain(displayName = "Correlation Composite Operator") +public class CorrelationCompositeDesc implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private ReduceSinkOperator correspondingReduceSinkOperator; + + public CorrelationCompositeDesc(){ + + } + + public CorrelationCompositeDesc(ReduceSinkOperator correspondingReduceSinkOperator){ + this.correspondingReduceSinkOperator = correspondingReduceSinkOperator; + } + + public void setCorrespondingReduceSinkOperator( + ReduceSinkOperator correspondingReduceSinkOperator){ + this.correspondingReduceSinkOperator = correspondingReduceSinkOperator; + } + + public ReduceSinkOperator getCorrespondingReduceSinkOperator(){ + return correspondingReduceSinkOperator; + } + + private int[] allOperationPathTags; + private ArrayList> bottomInternalOperators = new ArrayList>(); + private ArrayList correlationManualForwardOperators = new ArrayList();; + + public void setCorrelationManualForwardOperators( + ArrayList correlationManualForwardOperators){ + this.correlationManualForwardOperators = correlationManualForwardOperators; + } + + public ArrayList getCorrelationManualForwardOperators(){ + return correlationManualForwardOperators; + } + + public void setBottomInternalOperators(ArrayList> bottomInternalOperators){ + this.bottomInternalOperators = bottomInternalOperators; + } + + public ArrayList> getBottomInternalOperators(){ + return bottomInternalOperators; + } + + public void setAllOperationPathTags(int[] allOperationPathTags){ + this.allOperationPathTags = allOperationPathTags; + } + + public int[] getAllOperationPathTags(){ + return allOperationPathTags; + } + + + public void setInternalNodes(ArrayList> bottomInternalOperators, + ArrayList> topInternalOperators, int[] allOperationPathTags){ + this.allOperationPathTags = allOperationPathTags; + // the size of bottomInternalOperators and topInternalOperators should be same. + this.bottomInternalOperators.addAll(bottomInternalOperators); + for (Operator op: topInternalOperators){ + CorrelationManualForwardOperator cmfo = new CorrelationManualForwardOperator(); + cmfo.setChildOperators(null); + correlationManualForwardOperators.add(cmfo); + List> newChildren = + new ArrayList>(); + newChildren.add(cmfo); + op.setChildOperators(newChildren); + cmfo.setParentOperators(Utilities.makeList(op)); + } + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (revision 1210283) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (working copy) @@ -44,7 +44,14 @@ * @param hiveConf */ public void initialize(HiveConf hiveConf) { + CorrelationOptimizer correlationOptimizer = new CorrelationOptimizer(); transformations = new ArrayList(); + // Add correlation optimizer for first phase query plan tree analysis + // THe first phase will record original opColumnExprMap, opParseCtx, opRowResolver, + // since these may be changed during optimization + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION)) { + transformations.add(correlationOptimizer); + } // Add the transformation that computes the lineage information. transformations.add(new Generator()); if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCP)) { @@ -74,6 +81,10 @@ if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) { transformations.add(new ReduceSinkDeDuplication()); } + // Correlation discovery and query plan tree transformation phase + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION)) { + transformations.add(correlationOptimizer); + } } /** Index: ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (revision 1210283) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (working copy) @@ -23,6 +23,10 @@ import java.util.List; import org.apache.hadoop.hive.ql.plan.CollectDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationFakeReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationManualForwardDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc; import org.apache.hadoop.hive.ql.plan.ExtractDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc; @@ -91,6 +95,14 @@ HashTableDummyOperator.class)); opvec.add(new OpTuple(HashTableSinkDesc.class, HashTableSinkOperator.class)); + opvec.add(new OpTuple(CorrelationCompositeDesc.class, + CorrelationCompositeOperator.class)); + opvec.add(new OpTuple(CorrelationManualForwardDesc.class, + CorrelationManualForwardOperator.class)); + opvec.add(new OpTuple(CorrelationReducerDispatchDesc.class, + CorrelationReducerDispatchOperator.class)); + opvec.add(new OpTuple(CorrelationFakeReduceSinkDesc.class, + CorrelationFakeReduceSinkOperator.class)); } public static Operator get(Class opClass) { Index: ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationManualForwardDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationManualForwardDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationManualForwardDesc.java (revision 0) @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; + +/** + * Correlation Manual ForwardDesc. + * + */ +@Explain(displayName = "Correlation Manual Forward") +public class CorrelationManualForwardDesc implements Serializable { + private static final long serialVersionUID = 1L; + + public CorrelationManualForwardDesc() { + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationFakeReduceSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationFakeReduceSinkOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationFakeReduceSinkOperator.java (revision 0) @@ -0,0 +1,391 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CorrelationFakeReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.Serializer; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * Fake Reduce Sink Operator sends output to another operator (e.g. JOIN or GBY). + * CorrelationFakeReduceSinkOperator is used only in reduce phase. + * Basically, it is a bridge from one JOIN or GBY operator to another JOIN or GBY operator. + * A CorrelationFakeReduceSinkOperator will take care actions of startGroup and endGroup of its + * succeeding JOIN or GBY operator. + **/ +public class CorrelationFakeReduceSinkOperator extends Operator + implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * The evaluators for the key columns. Key columns decide the sort order on + * the reducer side. Key columns are passed to the reducer in the "key". + */ + protected transient ExprNodeEvaluator[] keyEval; + /** + * The evaluators for the value columns. Value columns are passed to reducer + * in the "value". + */ + protected transient ExprNodeEvaluator[] valueEval; + /** + * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in + * Hive language). Partition columns decide the reducer that the current row + * goes to. Partition columns are not passed to reducer. + */ + + transient Serializer keySerializer; + transient boolean keyIsText; + transient Serializer valueSerializer; + + transient TableDesc keyTableDesc; + transient TableDesc valueTableDesc; + + transient Deserializer inputKeyDeserializer; + + transient SerDe inputValueDeserializer; + + transient int tag; + transient byte[] tagByte; + transient ByteWritable tagWritable; + transient protected int numDistributionKeys; + transient protected int numDistinctExprs; + + transient InspectableObject tempInspectableObject; + transient HiveKey keyWritable; + transient Writable value; + + transient StructObjectInspector keyObjectInspector; + transient StructObjectInspector valueObjectInspector; + transient ObjectInspector[] partitionObjectInspectors; + + transient ObjectInspector outputKeyObjectInspector; + transient ObjectInspector outputValueObjectInspector; + transient ObjectInspector[] outputPartitionObjectInspectors; + + + //transient BytesWritable groupKey; + transient Object[][] cachedKeys; + transient Object[] cachedValues; + transient List> distinctColIndices; + + transient Random random; + + private ArrayList forwardedRow; + private Object keyObject; + private Object valueObject; + + private static String[] fieldNames; + + static { + ArrayList fieldNameArray = new ArrayList(); + for (Utilities.ReduceField r : Utilities.ReduceField.values()) { + fieldNameArray.add(r.toString()); + } + fieldNames = fieldNameArray.toArray(new String[0]); + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + forwardedRow = new ArrayList(3); + tagByte = new byte[1]; + tagWritable = new ByteWritable(); + tempInspectableObject = new InspectableObject(); + keyWritable = new HiveKey(); + assert childOperatorsArray.length == 1; + try { + keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()]; + int i = 0; + for (ExprNodeDesc e : conf.getKeyCols()) { + keyEval[i++] = ExprNodeEvaluatorFactory.get(e); + } + + numDistributionKeys = conf.getNumDistributionKeys(); + distinctColIndices = conf.getDistinctColumnIndices(); + numDistinctExprs = distinctColIndices.size(); + + valueEval = new ExprNodeEvaluator[conf.getValueCols().size()]; + i = 0; + for (ExprNodeDesc e : conf.getValueCols()) { + valueEval[i++] = ExprNodeEvaluatorFactory.get(e); + } + + tag = conf.getTag(); + tagByte[0] = (byte) tag; + tagWritable.set(tagByte[0]); + LOG.info("Using tag = " + tag); + + TableDesc keyTableDesc = conf.getKeySerializeInfo(); + keySerializer = (Serializer) keyTableDesc.getDeserializerClass() + .newInstance(); + keySerializer.initialize(null, keyTableDesc.getProperties()); + keyIsText = keySerializer.getSerializedClass().equals(Text.class); + + inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc + .getDeserializerClass(), null); + inputKeyDeserializer.initialize(null, keyTableDesc.getProperties()); + outputKeyObjectInspector = inputKeyDeserializer.getObjectInspector(); + + TableDesc valueTableDesc = conf.getValueSerializeInfo(); + valueSerializer = (Serializer) valueTableDesc.getDeserializerClass() + .newInstance(); + valueSerializer.initialize(null, valueTableDesc.getProperties()); + + inputValueDeserializer = (SerDe) ReflectionUtils.newInstance( + valueTableDesc.getDeserializerClass(), null); + inputValueDeserializer.initialize(null, valueTableDesc + .getProperties()); + outputValueObjectInspector = inputValueDeserializer.getObjectInspector(); + + ObjectInspector rowInspector = inputObjInspectors[0]; + + keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval, + distinctColIndices, + conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector); + valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, conf + .getOutputValueColumnNames(), rowInspector); + int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1; + int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 : + numDistributionKeys; + cachedKeys = new Object[numKeys][keyLen]; + cachedValues = new Object[valueEval.length]; + assert cachedKeys.length == 1; + + ArrayList ois = new ArrayList(); + ois.add(outputKeyObjectInspector); + ois.add(outputValueObjectInspector); + ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector); + + outputObjInspector = ObjectInspectorFactory + .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois); + + LOG.info("Fake ReduceSink inputObjInspectors" + + ((StructObjectInspector) inputObjInspectors[0]).getTypeName()); + + LOG.info("Fake ReduceSink outputObjInspectors " + + this.getChildOperators().get(0).getParentOperators().indexOf(this) + + " " + ((StructObjectInspector) outputObjInspector).getTypeName()); + + initializeChildren(hconf); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + /** + * Initializes array of ExprNodeEvaluator. Adds Union field for distinct + * column indices for group by. + * Puts the return values into a StructObjectInspector with output column + * names. + * + * If distinctColIndices is empty, the object inspector is same as + * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)} + */ + protected static StructObjectInspector initEvaluatorsAndReturnStruct( + ExprNodeEvaluator[] evals, List> distinctColIndices, + List outputColNames, + int length, ObjectInspector rowInspector) + throws HiveException { + int inspectorLen = evals.length > length ? length + 1 : evals.length; + List sois = new ArrayList(inspectorLen); + + // keys + ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, 0, length, rowInspector); + sois.addAll(Arrays.asList(fieldObjectInspectors)); + + if (evals.length > length) { + // union keys + List uois = new ArrayList(); + for (List distinctCols : distinctColIndices) { + List names = new ArrayList(); + List eois = new ArrayList(); + int numExprs = 0; + for (int i : distinctCols) { + names.add(HiveConf.getColumnInternalName(numExprs)); + eois.add(evals[i].initialize(rowInspector)); + numExprs++; + } + uois.add(ObjectInspectorFactory.getStandardStructObjectInspector(names, eois)); + } + UnionObjectInspector uoi = + ObjectInspectorFactory.getStandardUnionObjectInspector(uois); + sois.add(uoi); + } + return ObjectInspectorFactory.getStandardStructObjectInspector(outputColNames, sois ); + } + + private BytesWritable groupKey; + + @Override + public void processOp(Object row, int tag) throws HiveException { + + try { + // Evaluate the value + for (int i = 0; i < valueEval.length; i++) { + cachedValues[i] = valueEval[i].evaluate(row); + } + // Serialize the value + value = valueSerializer.serialize(cachedValues, valueObjectInspector); + valueObject = inputValueDeserializer.deserialize(value); + + // Evaluate the keys + Object[] distributionKeys = new Object[numDistributionKeys]; + for (int i = 0; i < numDistributionKeys; i++) { + distributionKeys[i] = keyEval[i].evaluate(row); + } + + if (numDistinctExprs > 0) { + // with distinct key(s) + for (int i = 0; i < numDistinctExprs; i++) { + System.arraycopy(distributionKeys, 0, cachedKeys[i], 0, numDistributionKeys); + Object[] distinctParameters = + new Object[distinctColIndices.get(i).size()]; + for (int j = 0; j < distinctParameters.length; j++) { + distinctParameters[j] = + keyEval[distinctColIndices.get(i).get(j)].evaluate(row); + } + cachedKeys[i][numDistributionKeys] = + new StandardUnion((byte)i, distinctParameters); + } + } else { + // no distinct key + System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys); + } + + + for (int i = 0; i < cachedKeys.length; i++) { + + if (keyIsText) { + Text key = (Text) keySerializer.serialize(cachedKeys[i], + keyObjectInspector); + keyWritable.set(key.getBytes(), 0, key.getLength()); + + } else { + // Must be BytesWritable + BytesWritable key = (BytesWritable) keySerializer.serialize( + cachedKeys[i], keyObjectInspector); + keyWritable.set(key.getBytes(), 0, key.getLength()); + } + + if (!keyWritable.equals(groupKey)) { + try { + keyObject = inputKeyDeserializer.deserialize(keyWritable); + } catch (Exception e) { + throw new HiveException( + "Hive Runtime Error: Unable to deserialize reduce input key from " + + Utilities.formatBinaryString(keyWritable.get(), 0, + keyWritable.getSize()) + " with properties " + + keyTableDesc.getProperties(), e); + } + if (groupKey == null) { // the first group + groupKey = new BytesWritable(); + } else { + // if its child has not been ended, end it + if(!keyWritable.equals(childOperatorsArray[0].getBytesWritableGroupKey())){ + childOperatorsArray[0].endGroup(); + } + } + groupKey.set(keyWritable.get(), 0, keyWritable.getSize()); + if(!groupKey.equals(childOperatorsArray[0].getBytesWritableGroupKey())){ + childOperatorsArray[0].startGroup(); + childOperatorsArray[0].setGroupKeyObject(keyObject); + childOperatorsArray[0].setBytesWritableGroupKey(groupKey); + } + + } + + forwardedRow.clear(); + forwardedRow.add(keyObject); + forwardedRow.add(valueObject); + forwardedRow.add(tagWritable); + forward(forwardedRow, outputObjInspector); + } + } catch (SerDeException e1) { + e1.printStackTrace(); + } + + } + + @Override + public void closeOp(boolean abort) throws HiveException { + if(!abort){ + //if(childOperatorsArray[0].getNumOfClosedParentOperators() == childOperatorsArray[0].getParentOperators().size() - 1){ + if(childOperatorsArray[0].allInitializedParentsAreClosed()) { + childOperatorsArray[0].endGroup(); + } + } + } + + @Override + public void startGroup() throws HiveException { + // do nothing + } + + @Override + public void endGroup() throws HiveException { + // do nothing + } + + @Override + public void setGroupKeyObject(Object keyObject) { + // do nothing + + } + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return new String("CFReduceSink"); + } + + @Override + public OperatorType getType() { + return null; + } +} Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1210283) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -403,6 +403,7 @@ HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true), + HIVEOPTCORRELATION("hive.optimize.correlation", false), // exploit intra-query correlations // Indexes HIVEOPTINDEXFILTER_COMPACT_MINSIZE("hive.optimize.index.filter.compact.minsize", (long) 5 * 1024 * 1024 * 1024), // 5G Index: ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (revision 1210283) +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (working copy) @@ -279,6 +279,7 @@ private void populateMapRedPlan3(Table src, Table src2) throws SemanticException { mr.setNumReduceTasks(Integer.valueOf(5)); mr.setNeedsTagging(true); + mr.setNeedsOperationPathTagging(false); ArrayList outputColumns = new ArrayList(); for (int i = 0; i < 2; i++) { outputColumns.add("_col" + i); Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (revision 1210283) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (working copy) @@ -60,16 +60,16 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; -import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; -import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; /** * General utility common functions for the Processor to convert operator into @@ -114,8 +114,14 @@ } if (reducer.getClass() == JoinOperator.class) { plan.setNeedsTagging(true); + plan.setNeedsOperationPathTagging(false); } + if (op.getConf().getNeedsOperationPathTagging()) { + plan.setNeedsTagging(true); + plan.setNeedsOperationPathTagging(true); + } + assert currTopOp != null; List> seenOps = opProcCtx.getSeenOps(); String currAliasId = opProcCtx.getCurrAliasId(); @@ -178,6 +184,7 @@ opTaskMap.put(reducer, currTask); if (reducer.getClass() == JoinOperator.class) { plan.setNeedsTagging(true); + plan.setNeedsOperationPathTagging(false); } ReduceSinkDesc desc = (ReduceSinkDesc) op.getConf(); plan.setNumReduceTasks(desc.getNumReducers()); @@ -308,6 +315,7 @@ if (reducer.getClass() == JoinOperator.class) { plan.setNeedsTagging(true); + plan.setNeedsOperationPathTagging(false); } initUnionPlan(opProcCtx, currTask, false); @@ -329,7 +337,7 @@ if ((taskTmpDirLst == null) || (taskTmpDirLst.isEmpty())) { return; } - + List tt_descLst = uCtx.getTTDesc(); assert !taskTmpDirLst.isEmpty() && !tt_descLst.isEmpty(); assert taskTmpDirLst.size() == tt_descLst.size(); @@ -337,7 +345,7 @@ assert local == false; List> topOperators = uCtx.getListTopOperators(); - + for (int pos = 0; pos < size; pos++) { String taskTmpDir = taskTmpDirLst.get(pos); TableDesc tt_desc = tt_descLst.get(pos); @@ -989,6 +997,7 @@ // dependent on the redTask if (reducer.getClass() == JoinOperator.class) { cplan.setNeedsTagging(true); + cplan.setNeedsOperationPathTagging(false); } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (revision 1210283) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (working copy) @@ -72,6 +72,7 @@ private Long minSplitSizePerRack; private boolean needsTagging; + private boolean needsOperationPathTagging; private boolean hadoopSupportsSplittable; private MapredLocalWork mapLocalWork; @@ -338,6 +339,16 @@ this.needsTagging = needsTagging; } + // TODO: include "Needs Operation Paths Tagging: false" into correct results + // @Explain(displayName = "Needs Operation Paths Tagging", normalExplain = false) + public boolean getNeedsOperationPathTagging() { + return needsOperationPathTagging; + } + + public void setNeedsOperationPathTagging(boolean needsOperationPathTagging) { + this.needsOperationPathTagging = needsOperationPathTagging; + } + public boolean getHadoopSupportsSplittable() { return hadoopSupportsSplittable; } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java (revision 0) @@ -0,0 +1,406 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; + +/** + * Correlation dispatch operator implementation. + * If used, CorrelationReducerDispatchOperator is the first operator in reduce phase. + * It will dispatch the record to corresponding JOIN or GBY operators. + * Suppose there are n children of this dispatch operator, a input record will be + * evaluated by n DispatchHandler that is used to select the corresponding parts of a record + * and then forward to succeeding JOIN or GBY operators. + */ +public class CorrelationReducerDispatchOperator extends Operator implements Serializable{ + + private static final long serialVersionUID = 1L; + private static String[] fieldNames; + static { + ArrayList fieldNameArray = new ArrayList(); + for (Utilities.ReduceField r : Utilities.ReduceField.values()) { + fieldNameArray.add(r.toString()); + } + fieldNames = fieldNameArray.toArray(new String[0]); + } + + protected static class DispatchHandler { + + protected Log l4j = LogFactory.getLog(this.getClass().getName()); + + private final ObjectInspector[] inputObjInspector; + private ObjectInspector outputObjInspector; + private ObjectInspector keyObjInspector; + private ObjectInspector valueObjInspector; + private final byte inputTag; + private final byte outputTag; + private final byte childIndx; + private final ByteWritable outputTagByteWritable; + private final SelectDesc selectDesc; + private final SelectDesc keySelectDesc; + private ExprNodeEvaluator[] keyEval; + private ExprNodeEvaluator[] eval; + + // counters for debugging + private transient long cntr = 0; + private transient long nextCntr = 1; + + private long getNextCntr(long cntr) { + // A very simple counter to keep track of number of rows processed by an + // operator. It dumps + // every 1 million times, and quickly before that + if (cntr >= 1000000) { + return cntr + 1000000; + } + + return 10 * cntr; + } + + public long getCntr() { + return this.cntr; + } + + private final Log LOG; + private final boolean isLogInfoEnabled; + private final String id; + + public DispatchHandler(ObjectInspector[] inputObjInspector, byte inputTag, byte childIndx, byte outputTag, + SelectDesc selectDesc, SelectDesc keySelectDesc, Log LOG, String id) + throws HiveException { + this.inputObjInspector = inputObjInspector; + assert this.inputObjInspector.length == 1; + this.inputTag = inputTag; + this.childIndx = childIndx; + this.outputTag = outputTag; + this.selectDesc = selectDesc; + this.keySelectDesc = keySelectDesc; + this.outputTagByteWritable = new ByteWritable(outputTag); + this.LOG = LOG; + this.isLogInfoEnabled = LOG.isInfoEnabled(); + this.id = id; + init(); + } + + private void init() throws HiveException { + ArrayList ois = new ArrayList(); + if (keySelectDesc.isSelStarNoCompute()) { + ois.add((ObjectInspector) ((ArrayList)inputObjInspector[0]).get(0)); + } else { + ArrayList colList = this.keySelectDesc.getColList(); + keyEval = new ExprNodeEvaluator[colList.size()]; + for (int k = 0; k < colList.size(); k++) { + assert (colList.get(k) != null); + keyEval[k] = ExprNodeEvaluatorFactory.get(colList.get(k)); + } + keyObjInspector = initEvaluatorsAndReturnStruct(keyEval, keySelectDesc + .getOutputColumnNames(), ((StandardStructObjectInspector) inputObjInspector[0]).getAllStructFieldRefs().get(0).getFieldObjectInspector()); + + ois.add(keyObjInspector); + l4j.info("Key: input tag " + (int)inputTag + ", output tag " + (int)outputTag + ", SELECT inputOIForThisTag" + + ((StructObjectInspector) inputObjInspector[0]).getTypeName()); + + } + if (selectDesc.isSelStarNoCompute()) { + ois.add((ObjectInspector) ((ArrayList)inputObjInspector[0]).get(1)); + } else { + ArrayList colList = this.selectDesc.getColList(); + eval = new ExprNodeEvaluator[colList.size()]; + for (int k = 0; k < colList.size(); k++) { + assert (colList.get(k) != null); + eval[k] = ExprNodeEvaluatorFactory.get(colList.get(k)); + } + valueObjInspector = initEvaluatorsAndReturnStruct(eval, selectDesc + .getOutputColumnNames(), ((StandardStructObjectInspector) inputObjInspector[0]).getAllStructFieldRefs().get(1).getFieldObjectInspector()); + + ois.add(valueObjInspector); + l4j.info("input tag " + (int)inputTag + ", output tag " + (int)outputTag + ", SELECT inputOIForThisTag" + + ((StructObjectInspector) inputObjInspector[0]).getTypeName()); //Yin + + } + ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector); + outputObjInspector = ObjectInspectorFactory + .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois); + l4j.info("input tag " + (int)inputTag + ", output tag " + (int)outputTag + ", SELECT outputObjInspector" + + ((StructObjectInspector) outputObjInspector).getTypeName()); //Yin + } + + public ObjectInspector getOutputObjInspector() { + return outputObjInspector; + } + + public Object process(Object row) throws HiveException { + Object[] keyOutput = new Object[keyEval.length]; + Object[] valueOutput = new Object[eval.length]; + ArrayList outputRow = new ArrayList(3); + List thisRow = (List)row; + if (keySelectDesc.isSelStarNoCompute()) { + outputRow.add(thisRow.get(0)); + } else { + Object key = thisRow.get(0); + for (int j = 0; j < keyEval.length; j++) { + try { + keyOutput[j] = keyEval[j].evaluate(key); + + } catch (HiveException e) { + throw e; + } catch (RuntimeException e) { + throw new HiveException("Error evaluating " + + keySelectDesc.getColList().get(j).getExprString(), e); + } + } + outputRow.add(keyOutput); + } + + if (selectDesc.isSelStarNoCompute()) { + outputRow.add(thisRow.get(1)); + } else { + Object value = thisRow.get(1); + for (int j = 0; j < eval.length; j++) { + try { + valueOutput[j] = eval[j].evaluate(value); + } catch (HiveException e) { + throw e; + } catch (RuntimeException e) { + throw new HiveException("Error evaluating " + + selectDesc.getColList().get(j).getExprString(), e); + } + } + outputRow.add(valueOutput); + } + outputRow.add(outputTagByteWritable); + + if (isLogInfoEnabled) { + cntr++; + if (cntr == nextCntr) { + LOG.info(id + "(inputTag, childIndx, outputTag)=(" + inputTag + ", " + childIndx + ", " + outputTag + "), forwarding " + cntr + " rows"); + nextCntr = getNextCntr(cntr); + } + } + + return outputRow; + } + + public void printCloseOpLog() { + LOG.info(id + "(inputTag, childIndx, outputTag)=(" + inputTag + ", " + childIndx + ", " + outputTag + "), forwarded " + cntr + " rows"); + } + + } + + //inputTag->(Child->List) + private HashMap>> dispatchConf; + //inputTag->(Child->List) + private HashMap>> dispatchValueSelectDescConf; + //inputTag->(Child->List) + private HashMap>> dispatchKeySelectDescConf; + //inputTag->(Child->List) + private HashMap>> dispatchHandlers; + //Child->(outputTag->DispatchHandler) + private HashMap> child2OutputTag2DispatchHandlers; + //Child->Child's inputObjInspectors + private HashMap childInputObjInspectors; + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + + dispatchConf = conf.getDispatchConf(); + dispatchValueSelectDescConf = conf.getDispatchValueSelectDescConf(); + dispatchKeySelectDescConf = conf.getDispatchKeySelectDescConf(); + dispatchHandlers = new HashMap>>(); + for (Entry>> entry: dispatchConf.entrySet()) { + HashMap> tmp = new HashMap>(); + for (Entry> child2outputTag: entry.getValue().entrySet()) { + tmp.put(child2outputTag.getKey(), new ArrayList()); + int indx = 0; + for (Integer outputTag: child2outputTag.getValue()) { + tmp.get(child2outputTag.getKey()).add( + new DispatchHandler(new ObjectInspector[]{inputObjInspectors[entry.getKey()]}, + entry.getKey().byteValue(), child2outputTag.getKey().byteValue(), outputTag.byteValue(), + dispatchValueSelectDescConf.get(entry.getKey()).get(child2outputTag.getKey()).get(indx), + dispatchKeySelectDescConf.get(entry.getKey()).get(child2outputTag.getKey()).get(indx), LOG, id)); + indx++; + } + } + dispatchHandlers.put(entry.getKey(), tmp); + } + + child2OutputTag2DispatchHandlers = new HashMap>(); + for (Entry>> entry: dispatchConf.entrySet()) { + for (Entry> child2outputTag: entry.getValue().entrySet()) { + if (!child2OutputTag2DispatchHandlers.containsKey(child2outputTag.getKey())) { + child2OutputTag2DispatchHandlers.put(child2outputTag.getKey(), new HashMap()); + } + int indx = 0; + for (Integer outputTag: child2outputTag.getValue()) { + child2OutputTag2DispatchHandlers.get(child2outputTag.getKey()). + put(outputTag, dispatchHandlers.get(entry.getKey()).get(child2outputTag.getKey()).get(indx)); + indx++; + } + + } + } + + childInputObjInspectors = new HashMap(); + for (Entry> entry: + child2OutputTag2DispatchHandlers.entrySet()) { + Integer l = Collections.max(entry.getValue().keySet()); + ObjectInspector[] childObjInspectors = new ObjectInspector[l.intValue() + 1]; + for (Entry e: entry.getValue().entrySet()) { + if (e.getKey().intValue() == -1) { + assert childObjInspectors.length == 1; + childObjInspectors[0] = e.getValue().getOutputObjInspector(); + } else { + childObjInspectors[e.getKey().intValue()] = e.getValue().getOutputObjInspector(); + } + } + childInputObjInspectors.put(entry.getKey(), childObjInspectors); + } + + initializeChildren(hconf); + } + + //Each child should has its own outputObjInspector + @Override + protected void initializeChildren(Configuration hconf) throws HiveException { + state = State.INIT; + LOG.info("Operator " + id + " " + getName() + " initialized"); + if (childOperators == null) { + return; + } + LOG.info("Initializing children of " + id + " " + getName()); + for (int i = 0; i < childOperatorsArray.length; i++) { + LOG.info("Initializing child " + i + " " + childOperatorsArray[i].getIdentifier() + " " + + childOperatorsArray[i].getName() + + " " + childInputObjInspectors.get(i).length); + childOperatorsArray[i].initialize(hconf, childInputObjInspectors.get(i)); + if (reporter != null) { + childOperatorsArray[i].setReporter(reporter); + } + } + } + + private int opTags; + private int inputTag; + + @Override + public void processOp(Object row, int tag) throws HiveException { + ArrayList thisRow = (ArrayList)row; + assert thisRow.size() == 4; + opTags = ((ByteWritable)thisRow.get(3)).get(); + inputTag = (int)((ByteWritable)thisRow.get(2)).get(); + forward(thisRow.subList(0, 3), inputObjInspectors[inputTag]); + } + + @Override + public void forward(Object row, ObjectInspector rowInspector) + throws HiveException { + if ((++outputRows % 1000) == 0) { + if (counterNameToEnum != null) { + incrCounter(numOutputRowsCntr, outputRows); + outputRows = 0; + } + } + + if (childOperatorsArray == null && childOperators != null) { + throw new HiveException( + "Internal Hive error during operator initialization."); + } + + if ((childOperatorsArray == null) || (getDone())) { + return; + } + + int childrenDone = 0; + int forwardFLag = 1; + assert childOperatorsArray.length <= 8; + for (int i = 0; i < childOperatorsArray.length; i++) { + Operator o = childOperatorsArray[i]; + if (o.getDone()) { + childrenDone++; + } else { + if ((opTags & (forwardFLag << i)) != 0){ + for(int j = 0; j> childIndx2DispatchHandlers: + dispatchHandlers.values()) { + for (ArrayList dispatchHandlers: childIndx2DispatchHandlers.values()) { + for (DispatchHandler dispatchHandler: dispatchHandlers) { + dispatchHandler.printCloseOpLog(); + } + } + } + } + + @Override + public void setGroupKeyObject(Object keyObject) { + this.groupKeyObject = keyObject; + for (Operator op : childOperators) { + op.setGroupKeyObject(keyObject); + } + } + + @Override + public OperatorType getType() { + // TODO Auto-generated method stub + return null; + } + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return new String("CDP"); + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java (revision 0) @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.io.LongWritable; + +/** + * Correlation composite operator implementation. + * This operator is used only in map phase. + * Suppose that multiple sub-queries involve a common table, + * to share the table scan, CorrelationCompositeOperator will be used. + * For example, suppose that the common table is T and predicates P1 and P2 will be used + * in sub-queries SQ1 and SQ2, respectively. The CorrelationCompositeOperator + * will apply P1 and P2 on the record and tag the record based on if P1 or P2 is true. + **/ +public class CorrelationCompositeOperator extends Operator implements Serializable { + + static final private Log LOG = LogFactory.getLog(Driver.class.getName()); + static final private LogHelper console = new LogHelper(LOG); + + public static enum Counter { + FILTERED, PASSED + } + + /** + * + */ + private static final long serialVersionUID = 1L; + + private ArrayList> bottomInternalOperators; + private ArrayList correlationManualForwardOperators; + + private ReduceSinkOperator correspondingReduceSinkOperators; + + private transient final LongWritable filtered_count, passed_count; + + + public CorrelationCompositeOperator() { + super(); + filtered_count = new LongWritable(); + passed_count = new LongWritable(); + } + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return new String("CCO"); + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + correspondingReduceSinkOperators = conf.getCorrespondingReduceSinkOperator(); + bottomInternalOperators = conf.getBottomInternalOperators(); + correlationManualForwardOperators = conf.getCorrelationManualForwardOperators(); + allOperationPathTags = conf.getAllOperationPathTags(); + statsMap.put(Counter.FILTERED, filtered_count); + statsMap.put(Counter.PASSED, passed_count); + // initialize internal operators + for (Operator op:bottomInternalOperators) { + op.initialize(hconf, inputObjInspectors); + } + //initialize its children + initializeChildren(hconf); + } + + private int[] allOperationPathTags; + + @Override + public void processOp(Object row, int tag) throws HiveException { + ArrayList operationPathTags = new ArrayList(); + boolean isForward = false; + for (Operator op: bottomInternalOperators){ + op.process(row, tag); + } + ArrayList rows = new ArrayList(); + int[] tags = new int[bottomInternalOperators.size()]; + int i = 0; + for (CorrelationManualForwardOperator ymf: correlationManualForwardOperators) { + rows.add(ymf.getRow()); + tags[i] = ymf.getTag(); + i++; + } + if (rows.size() > 0){ + i = 0; + for (Object r: rows){ + if (r != null){ + operationPathTags.add(allOperationPathTags[i]); + isForward = true; + } + i++; + } + assert correspondingReduceSinkOperators != null; + correspondingReduceSinkOperators.setOperationPathTags(operationPathTags); + } + if (isForward) { + passed_count.set(passed_count.get() + 1); + forward(row, inputObjInspectors[tag]); + } else { + filtered_count.set(filtered_count.get() + 1); + } + for (CorrelationManualForwardOperator ymf: correlationManualForwardOperators) { + ymf.clean(); + } + + } + + @Override + public OperatorType getType() { + // TODO Auto-generated method stub + return null; + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (revision 1210283) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.OutputCollector; @@ -512,7 +513,7 @@ LOG.debug("End group Done"); } - protected boolean allInitializedParentsAreClosed() { + public boolean allInitializedParentsAreClosed() { if (parentOperators != null) { for (Operator parent : parentOperators) { if(parent==null){ @@ -1335,4 +1336,20 @@ public void cleanUpInputFileChangedOp() throws HiveException { } + // bytesWritableGroupKey is only used when a query plan is optimized by CorrelationOptimizer. + // CorrelationFakeReduceSinkOperator will use this variable to determine when it needs to start or end the group + // for its child operator. + protected BytesWritable bytesWritableGroupKey; + + public void setBytesWritableGroupKey(BytesWritable groupKey) { + if (bytesWritableGroupKey == null) { + bytesWritableGroupKey = new BytesWritable(); + } + bytesWritableGroupKey.set(groupKey.get(), 0, groupKey.getSize()); + } + + public BytesWritable getBytesWritableGroupKey() { + return bytesWritableGroupKey; + } + }