### Eclipse Workspace Patch 1.0 #P hive-trunk-HIVE-2206-new Index: ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (revision 1172874) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (working copy) @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.hooks.LineageInfo; @@ -80,7 +81,8 @@ // reducer private Map> groupOpToInputTables; private Map prunedPartitions; - + + Map groupbyRegular2MapSide; /** * The lineage information. */ @@ -157,7 +159,8 @@ HashMap opToSamplePruner, SemanticAnalyzer.GlobalLimitCtx globalLimitCtx, HashMap nameToSplitSample, - HashSet semanticInputs, List> rootTasks) { + HashSet semanticInputs, List> rootTasks, + Map groupbyRegular2MapSide) { this.conf = conf; this.qb = qb; this.ast = ast; @@ -183,6 +186,7 @@ this.globalLimitCtx = globalLimitCtx; this.semanticInputs = semanticInputs; this.rootTasks = rootTasks; + this.groupbyRegular2MapSide = groupbyRegular2MapSide; } /** @@ -529,4 +533,8 @@ this.rootTasks.remove(rootTask); this.rootTasks.addAll(tasks); } + + public Map getGroupbyRegular2MapSide() { + return groupbyRegular2MapSide; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java (revision 0) @@ -0,0 +1,801 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.OpParseContext; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.QB; +import org.apache.hadoop.hive.ql.parse.QBExpr; +import org.apache.hadoop.hive.ql.parse.RowResolver; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +/** + * Implementation of rule-based correlation optimizer. The optimization is based on + * the paper "YSmart: Yet Another SQL-to-MapReduce Translator" + * (http://www.cse.ohio-state.edu/hpcs/WWW/HTML/publications/papers/TR-11-7.pdf). + * This optimizer detects three kinds of + * correlations, Input Correlation (IC), Transit Correlation (TC) and Job Flow Correlation (JFC). + * After the optimization, the structure of a plan tree should be TS->CCO->SEL(select the union of columns + * which used by operations)->CRS->CDP->operators in the reduce phase + */ + +public class CorrelationOptimizer implements Transform { + + static final private Log LOG = LogFactory.getLog(CorrelationOptimizer.class.getName()); + private final HashMap AliastoTabName; + private final HashMap AliastoTab; + + public CorrelationOptimizer(){ + super(); + AliastoTabName = new HashMap(); + AliastoTab = new HashMap(); + pGraphContext = null; + } + + private void initializeAliastoTabNameMapping(QB qb){ + for (String alias: qb.getAliases()){ + AliastoTabName.put(alias, qb.getTabNameForAlias(alias)); + AliastoTab.put(alias, qb.getMetaData().getSrcForAlias(alias)); + } + for (String subqalias: qb.getSubqAliases()){ + QBExpr qbexpr = qb.getSubqForAlias(subqalias); + initializeAliastoTabNameMapping(qbexpr.getQB()); + } + } + + public static ExprNodeColumnDesc getStringColumn(String columnName) { + return new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, columnName, + "", false); + } + + protected ParseContext pGraphContext; + private LinkedHashMap, OpParseContext> opParseCtx; + private final LinkedHashMap, OpParseContext> originalOpParseCtx = + new LinkedHashMap, OpParseContext>(); + private final LinkedHashMap, RowResolver> originalOpRowResolver = + new LinkedHashMap, RowResolver>(); + private final LinkedHashMap, Map> originalOpColumnExprMap = + new LinkedHashMap, Map>(); + + private boolean isPhase1 = true; + + private Map groupbyRegular2MapSide; + + /** + * Transform the query tree. Firstly, find out correlations between operations. + * Then, group these operators in groups + * @param pactx + * current parse context + */ + public ParseContext transform(ParseContext pctx) throws SemanticException { + + if (isPhase1){ + + pGraphContext = pctx; + opParseCtx = pctx.getOpParseCtx(); + + CorrelationNodePhase1ProcCtx correlationCtx = new CorrelationNodePhase1ProcCtx(); + + Map opRules = new LinkedHashMap(); + + Dispatcher disp = new DefaultRuleDispatcher(getPhase1DefaultProc(), opRules, correlationCtx); + GraphWalker ogw = new DefaultGraphWalker(disp); + + // Create a list of topop nodes + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pGraphContext.getTopOps().values()); + ogw.startWalking(topNodes, null); + isPhase1 = false; + + }else{ + + /* Types of correlations: + * 1) Input Correlation: Multiple nodes have input correlation + (IC) if their input relation sets are not disjoint; + 2) Transit Correlation: Multiple nodes have transit correlation + (TC) if they have not only input correlation, but + also the same partition key; + 3) Job Flow Correlation: A node has job flow correlation + (JFC) with one of its child nodes if it has the same + partition key as that child node. + * */ + + pGraphContext = pctx; + opParseCtx = pctx.getOpParseCtx(); + + groupbyRegular2MapSide = pctx.getGroupbyRegular2MapSide(); + + initializeAliastoTabNameMapping(pGraphContext.getQB()); + + // 1: find out correlation + CorrelationNodeProcCtx correlationCtx = new CorrelationNodeProcCtx(); + + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", "RS%"), new CorrelationNodeProc()); + + Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, correlationCtx); + GraphWalker ogw = new DefaultGraphWalker(disp); + + // Create a list of topop nodes + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pGraphContext.getTopOps().values()); + ogw.startWalking(topNodes, null); + + // 2: transform the query plan tree + LOG.info("Begain query plan transformation based on intra-query correlations"); + for(IntraQueryCorrelation correlation: correlationCtx.getCorrelations()){ + pGraphContext = CorrelationOptimizerUtils.applyCorrelation( + correlation, pGraphContext, originalOpColumnExprMap, originalOpRowResolver, + groupbyRegular2MapSide, originalOpParseCtx); + } + LOG.info("Finish query plan transformation based on intra-query correlations"); + + } + + return pGraphContext; + } + + + + private NodeProcessor getPhase1DefaultProc(){ + return new NodeProcessor() { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { + Operator op = (Operator)nd; + OpParseContext opCtx= opParseCtx.get(op); + + if(op.getColumnExprMap() != null){ + originalOpColumnExprMap.put(op, op.getColumnExprMap()); + } + originalOpParseCtx.put(op, opCtx); + originalOpRowResolver.put(op, opCtx.getRowResolver()); + + return null; + } + }; + } + + private class CorrelationNodeProc implements NodeProcessor { + + public ReduceSinkOperator findNextChildReduceSinkOperator(ReduceSinkOperator rsop){ + Operator op = rsop.getChildOperators().get(0); + while(!op.getName().equals("RS")){ + if(op.getName().equals("FS")){ + return null; + } + assert op.getChildOperators().size() <= 1; + op = op.getChildOperators().get(0); + } + return (ReduceSinkOperator)op; + } + + /** + * Find all peer ReduceSinkOperators (which have the same child operator of op) of op (op included). + */ + private ArrayList findPeerReduceSinkOperators(ReduceSinkOperator op){ + + ArrayList peerReduceSinkOperators = new ArrayList(); + + List> children = op.getChildOperators(); + assert children.size() == 1; + + for(Operator parent: children.get(0).getParentOperators()){ + assert (parent instanceof ReduceSinkOperator); + peerReduceSinkOperators.add((ReduceSinkOperator)parent); + } + + return peerReduceSinkOperators; + } + + private ArrayList findCorrelatedReduceSinkOperators(Operator op, + HashSet keyColumns, IntraQueryCorrelation correlation) throws Exception{ + + LOG.info("now detecting operator " + op.getIdentifier() + " " + op.getName()); + + ArrayList correlatedReduceSinkOps = new ArrayList(); + if(op.getParentOperators() == null){ + return correlatedReduceSinkOps; + } + if (originalOpColumnExprMap.get(op) == null && !(op instanceof ReduceSinkOperator)){ + assert op.getParentOperators().size() == 1; + correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators( + (Operator)op.getParentOperators().get(0), keyColumns, correlation)); + }else if(originalOpColumnExprMap.get(op) != null && !(op instanceof ReduceSinkOperator)){ + HashSet newKeyColumns = new HashSet(); + for (String keyColumn: keyColumns){ + ExprNodeDesc col = (ExprNodeDesc) originalOpColumnExprMap.get(op).get(keyColumn); + if(col instanceof ExprNodeColumnDesc){ + newKeyColumns.add(((ExprNodeColumnDesc)col).getColumn()); + } + } + + if(op.getName().equals("JOIN")){ + HashSet tableNeedToCheck = new HashSet(); + for (String keyColumn: keyColumns){ + for(ColumnInfo cinfo: originalOpParseCtx.get(op).getRowResolver().getColumnInfos()){ + if(keyColumn.equals(cinfo.getInternalName())){ + tableNeedToCheck.add(cinfo.getTabAlias()); + } + } + } + + for (Object parent: op.getParentOperators()){ + assert originalOpParseCtx.get(parent).getRowResolver().getTableNames().size() == 1; + for(String tbl: originalOpParseCtx.get(parent).getRowResolver().getTableNames()){ + if(tableNeedToCheck.contains(tbl)){ + correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators((Operator)parent, newKeyColumns, correlation)); + break; + } + } + } + + }else{ + assert op.getParentOperators().size() == 1; + correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators((Operator)op.getParentOperators().get(0), newKeyColumns, correlation)); + } + + }else if(originalOpColumnExprMap.get(op) != null && op instanceof ReduceSinkOperator){ + + HashSet newKeyColumns = new HashSet(); + for (String keyColumn: keyColumns){ + ExprNodeDesc col = (ExprNodeDesc) originalOpColumnExprMap.get(op).get(keyColumn); + if(col instanceof ExprNodeColumnDesc){ + newKeyColumns.add(((ExprNodeColumnDesc)col).getColumn()); + } + } + + ReduceSinkOperator rsop = (ReduceSinkOperator)op; + HashSet thisKeyColumns = new HashSet(); + for(ExprNodeDesc key: rsop.getConf().getKeyCols()){ + if (key instanceof ExprNodeColumnDesc){ + thisKeyColumns.add(((ExprNodeColumnDesc)key).getColumn()); + } + } + + // if the intersection of newKeyColumns and thisKeyColumns is not empty, isCorrelated is true + boolean isCorrelated = false; + // TODO: should use if intersection is empty to evaluate if two corresponding operators are correlated + Set intersection = new HashSet(newKeyColumns); + intersection.retainAll(thisKeyColumns); + isCorrelated = !(intersection.isEmpty()); + + ReduceSinkOperator nextChildReduceSinkOperator = findNextChildReduceSinkOperator(rsop); + + if(isCorrelated){ + if(nextChildReduceSinkOperator.getChildOperators().get(0).getName().equals("JOIN")){ + if (intersection.size() != nextChildReduceSinkOperator.getConf().getKeyCols().size() || + intersection.size() != rsop.getConf().getKeyCols().size()){ + isCorrelated = false; + } + } + } + + if (isCorrelated){ + if (((Operator)(op.getChildOperators().get(0))).getName().equals("JOIN")){ + ArrayList peers = findPeerReduceSinkOperators(rsop); + correlatedReduceSinkOps.addAll(peers); + }else{ + correlatedReduceSinkOps.add(rsop); + } + if(nextChildReduceSinkOperator.getChildOperators().get(0).getName().equals("GBY") && + (intersection.size() < nextChildReduceSinkOperator.getConf().getKeyCols().size())){ + correlation.addToRSGBYToBeReplacedByGBYRSGBY(nextChildReduceSinkOperator); + } + + }else{ + correlatedReduceSinkOps.clear(); + correlation.getRSGBYToBeReplacedByGBYRSGBY().clear(); + } + }else{ + throw new Exception("Correlation optimizer: ReduceSinkOperator " + op.getIdentifier() + " does not have ColumnExprMap"); + } + return correlatedReduceSinkOps; + } + + private ArrayList exploitJFC(ReduceSinkOperator op, CorrelationNodeProcCtx correlationCtx, IntraQueryCorrelation correlation){ + + correlationCtx.addWalked(op); + correlation.addToAllReduceSinkOperators(op); + + ArrayList ReduceSinkOperators = new ArrayList(); + + boolean sholdDetect = true; + + ArrayList keys = op.getConf().getKeyCols(); + HashSet keyColumns = new HashSet(); + for(ExprNodeDesc key: keys){ + if (!(key instanceof ExprNodeColumnDesc)){ + sholdDetect = false; + }else{ + keyColumns.add(((ExprNodeColumnDesc)key).getColumn()); + } + } + + if (sholdDetect){ + ArrayList newReduceSinkOperators = new ArrayList(); + for (Operator parent: op.getParentOperators()){ + try { + ArrayList correlatedReduceSinkOperators = + findCorrelatedReduceSinkOperators(parent, keyColumns, correlation); + if (correlatedReduceSinkOperators == null || correlatedReduceSinkOperators.size() == 0){ + newReduceSinkOperators.add(op); + }else{ + ArrayList deduplicatedCorrelatedReduceSinkOperators = new ArrayList(); + for(ReduceSinkOperator rsop: correlatedReduceSinkOperators){ + if(!deduplicatedCorrelatedReduceSinkOperators.contains(rsop)){ + deduplicatedCorrelatedReduceSinkOperators.add(rsop); + } + } + for(ReduceSinkOperator rsop: deduplicatedCorrelatedReduceSinkOperators){ + if ( !correlation.getUp2downRSops().containsKey(op) ){ + correlation.getUp2downRSops().put(op, new ArrayList()); + } + correlation.getUp2downRSops().get(op).add(rsop); + + if ( !correlation.getDown2upRSops().containsKey(rsop)){ + correlation.getDown2upRSops().put(rsop, new ArrayList()); + } + correlation.getDown2upRSops().get(rsop).add(op); + ArrayList exploited = exploitJFC(rsop, correlationCtx, correlation); + if (exploited == null || exploited.size() == 0){ + newReduceSinkOperators.add(rsop); + }else{ + newReduceSinkOperators.addAll(exploited); + } + } + } + + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + ReduceSinkOperators.clear(); + ReduceSinkOperators.addAll(newReduceSinkOperators); + } + return ReduceSinkOperators; + } + + private TableScanOperator findTableScanOPerator(Operator startPoint){ + Operator thisOp = (Operator) startPoint.getParentOperators().get(0); + while(true){ + if(thisOp.getName().equals("RS")) { + return null; + }else if(thisOp.getName().equals("TS")){ + return (TableScanOperator)thisOp; + } + else{ + if (thisOp.getParentOperators() != null) { + thisOp = (Operator) thisOp.getParentOperators().get(0); + } + else{ + break; + } + } + } + return null; + } + + private void annotateOpPlan(IntraQueryCorrelation correlation){ + HashMap bottomReduceSink2OpPlanMap = new HashMap(); + int count = 0; + + for(ReduceSinkOperator rsop: correlation.getBottomReduceSinkOperators()){ + if (!bottomReduceSink2OpPlanMap.containsKey(rsop)){ + bottomReduceSink2OpPlanMap.put(rsop, count); + for(ReduceSinkOperator peerRSop: findPeerReduceSinkOperators(rsop)){ + if(correlation.getBottomReduceSinkOperators().contains(peerRSop)){ + bottomReduceSink2OpPlanMap.put(peerRSop, count); + } + } + count++; + } + } + correlation.setBottomReduceSink2OperationPathMap(bottomReduceSink2OpPlanMap); + } + + public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, + Object... nodeOutputs) throws SemanticException { + + CorrelationNodeProcCtx correlationCtx = (CorrelationNodeProcCtx)ctx; + + ReduceSinkOperator op = (ReduceSinkOperator)nd; + + if (correlationCtx.isWalked(op)){ + return null; + } + + if (op.getConf().getKeyCols().size() == 0 || + (!op.getChildOperators().get(0).getName().equals("JOIN") && + !op.getChildOperators().get(0).getName().equals("GBY"))){ + correlationCtx.addWalked(op); + return null; + } + + // 1: find out correlation + IntraQueryCorrelation correlation = new IntraQueryCorrelation(); + ArrayList peerReduceSinkOperators = findPeerReduceSinkOperators(op); + ArrayList bottomReduceSinkOperators = new ArrayList(); + for(ReduceSinkOperator rsop: peerReduceSinkOperators){ + + ArrayList thisBottomReduceSinkOperators= exploitJFC(rsop, correlationCtx, correlation); + + if(peerReduceSinkOperators.size() == 1){ + correlation.addToRSGBYToBeReplacedByGBYRSGBY(rsop); + } + if (thisBottomReduceSinkOperators.size() == 0){ + thisBottomReduceSinkOperators.add(rsop); + }else{ + boolean isClear = false; + for(ReduceSinkOperator bottomrsop: thisBottomReduceSinkOperators){ + TableScanOperator tsop = findTableScanOPerator(bottomrsop); + if (tsop == null){ + isClear = true; // currently the optimizer can only optimize correlations involving source tables + }else{ + if ( !correlation.getTop2TSops().containsKey(rsop) ){ + correlation.getTop2TSops().put(rsop, new ArrayList()); + } + correlation.getTop2TSops().get(rsop).add(tsop); + + if ( !correlation.getBottom2TSops().containsKey(bottomrsop)){ + correlation.getBottom2TSops().put(bottomrsop, new ArrayList()); + } + correlation.getBottom2TSops().get(bottomrsop).add(tsop); + } + } + if(isClear){ + thisBottomReduceSinkOperators.clear(); + thisBottomReduceSinkOperators.add(rsop); + } + } + bottomReduceSinkOperators.addAll(thisBottomReduceSinkOperators); + } + + if(!peerReduceSinkOperators.containsAll(bottomReduceSinkOperators)){ + LOG.info("has job flow correlation"); + correlation.setJobFlowCorrelation(true); + correlation.setJFCCorrelation(peerReduceSinkOperators, bottomReduceSinkOperators); + annotateOpPlan(correlation); + } + + if (correlation.hasJobFlowCorrelation()){ + boolean hasICandTC = findICandTC(correlation); + LOG.info("has input correlation and transit correlation? " + hasICandTC); + correlation.setInputCorrelation(hasICandTC); + correlation.setTransitCorrelation(hasICandTC); + boolean isInvolve = isInvolveSelfJoin(correlation); + LOG.info("involve self-join? " + isInvolve); + correlation.setInvolveSelfJoin(isInvolve); + //TODO: support self-join involved cases. For self-join related operation paths, after the correlation dispatch operator, each path should be filtered by a + // filter operator + if(!isInvolve){ + LOG.info("correlation detected"); + correlationCtx.addCorrelation(correlation); + }else{ + LOG.info("correlation discarded. The current optimizer cannot optimize it"); + } + } + + correlationCtx.addWalked(op); + + return null; + } + + private boolean isInvolveSelfJoin(IntraQueryCorrelation correlation){ + boolean isInvolve = false; + for(Entry> entry: correlation.getTable2CorrelatedRSops().entrySet()){ + for(ReduceSinkOperator rsop: entry.getValue()){ + HashSet intersection = new HashSet(findPeerReduceSinkOperators(rsop)); + intersection.retainAll(entry.getValue()); + + // if involve self-join + if(intersection.size() > 1){ + isInvolve = true; + return isInvolve; + } + } + } + return isInvolve; + } + + private boolean findICandTC(IntraQueryCorrelation correlation){ + + boolean hasICandTC = false; + HashMap> table2RSops = new HashMap>(); + HashMap> table2TSops = new HashMap>(); + + for (Entry> entry: correlation.getBottom2TSops().entrySet()){ + String tbl = AliastoTabName.get(entry.getValue().get(0).getConf().getAlias()); + if (!table2RSops.containsKey(tbl) && !table2TSops.containsKey(tbl)){ + table2RSops.put(tbl, new ArrayList()); + table2TSops.put(tbl, new ArrayList()); + } + assert entry.getValue().size() == 1; + table2RSops.get(tbl).add(entry.getKey()); + table2TSops.get(tbl).add(entry.getValue().get(0)); + } + + for(Entry> entry: table2RSops.entrySet()){ + if(entry.getValue().size() > 1){ + hasICandTC = true; + } + } + + + correlation.setICandTCCorrelation(table2RSops, table2TSops); + + return hasICandTC; + } + } + + private NodeProcessor getDefaultProc() { + return new NodeProcessor() { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { + // do nothing + return null; + } + }; + } + + private class CorrelationNodePhase1ProcCtx implements NodeProcessorCtx { + + } + + + public class IntraQueryCorrelation{ + + private final HashMap> down2upRSops = new HashMap>(); + private final HashMap> up2downRSops = new HashMap>(); + + private final HashMap> top2TSops = new HashMap>(); + private final HashMap> bottom2TSops = new HashMap>(); + + private ArrayList topReduceSinkOperators; + private ArrayList bottomReduceSinkOperators; + + private HashMap> table2CorrelatedRSops; + + private HashMap> table2CorrelatedTSops; + + private HashMap bottomReduceSink2OperationPathMap; + + private final HashMap>> dispatchConf = + new HashMap>>(); //inputTag->(Child->outputTag) + private final HashMap>> dispatchValueSelectDescConf = + new HashMap>>(); //inputTag->(Child->SelectDesc) + private final HashMap>> dispatchKeySelectDescConf = + new HashMap>>(); //inputTag->(Child->SelectDesc) + + private final HashSet allReduceSinkOperators = new HashSet(); + + // this set contains all ReduceSink-GroupBy operator-pairs that should be be replaced by GroupBy-ReduceSink-GroupBy pattern. + // the type of first GroupByOperator is hash type and this one will be used to group records. + private final HashSet rSGBYToBeReplacedByGBYRSGBY = new HashSet(); + + public void addToRSGBYToBeReplacedByGBYRSGBY(ReduceSinkOperator rsop){ + rSGBYToBeReplacedByGBYRSGBY.add(rsop); + } + + public HashSet getRSGBYToBeReplacedByGBYRSGBY(){ + return rSGBYToBeReplacedByGBYRSGBY; + } + + public void addToAllReduceSinkOperators(ReduceSinkOperator rsop){ + allReduceSinkOperators.add(rsop); + } + + public HashSet getAllReduceSinkOperators(){ + return allReduceSinkOperators; + } + + public HashMap>> getDispatchConf(){ + return dispatchConf; + } + + public HashMap>> getDispatchValueSelectDescConf(){ + return dispatchValueSelectDescConf; + } + + public HashMap>> getDispatchKeySelectDescConf(){ + return dispatchKeySelectDescConf; + } + + public void addOperationPathToDispatchConf(Integer opPlan){ + if(!dispatchConf.containsKey(opPlan)) { + dispatchConf.put(opPlan, new HashMap>()); + } + } + + public HashMap> getDispatchConfForOperationPath(Integer opPlan){ + return dispatchConf.get(opPlan); + } + + public void addOperationPathToDispatchValueSelectDescConf(Integer opPlan){ + if(!dispatchValueSelectDescConf.containsKey(opPlan)) { + dispatchValueSelectDescConf.put(opPlan, new HashMap>()); + } + } + + public HashMap> getDispatchValueSelectDescConfForOperationPath(Integer opPlan){ + return dispatchValueSelectDescConf.get(opPlan); + } + + public void addOperationPathToDispatchKeySelectDescConf(Integer opPlan){ + if(!dispatchKeySelectDescConf.containsKey(opPlan)) { + dispatchKeySelectDescConf.put(opPlan, new HashMap>()); + } + } + + public HashMap> getDispatchKeySelectDescConfForOperationPath(Integer opPlan){ + return dispatchKeySelectDescConf.get(opPlan); + } + + private boolean inputCorrelation = false; + private boolean transitCorrelation = false; + private boolean jobFlowCorrelation = false; + + public void setBottomReduceSink2OperationPathMap(HashMap bottomReduceSink2OperationPathMap){ + this.bottomReduceSink2OperationPathMap = bottomReduceSink2OperationPathMap; + } + + public HashMap getBottomReduceSink2OperationPathMap(){ + return bottomReduceSink2OperationPathMap; + } + + public void setInputCorrelation(boolean inputCorrelation){ + this.inputCorrelation = inputCorrelation; + } + + public boolean hasInputCorrelation(){ + return inputCorrelation; + } + + public void setTransitCorrelation(boolean transitCorrelation){ + this.transitCorrelation = transitCorrelation; + } + + public boolean hasTransitCorrelation(){ + return transitCorrelation; + } + + public void setJobFlowCorrelation(boolean jobFlowCorrelation){ + this.jobFlowCorrelation = jobFlowCorrelation; + } + + public boolean hasJobFlowCorrelation(){ + return jobFlowCorrelation; + } + + public HashMap> getTop2TSops(){ + return top2TSops; + } + + public HashMap> getBottom2TSops(){ + return bottom2TSops; + } + + public HashMap> getDown2upRSops(){ + return down2upRSops; + } + + public HashMap> getUp2downRSops(){ + return up2downRSops; + } + + public void setJFCCorrelation(ArrayList peerReduceSinkOperators, + ArrayList bottomReduceSinkOperators){ + this.topReduceSinkOperators = peerReduceSinkOperators; + this.bottomReduceSinkOperators = bottomReduceSinkOperators; + } + + + public ArrayList getTopReduceSinkOperators(){ + return topReduceSinkOperators; + } + + public ArrayList getBottomReduceSinkOperators(){ + return bottomReduceSinkOperators; + } + + public void setICandTCCorrelation(HashMap> table2RSops, + HashMap> table2TSops){ + this.table2CorrelatedRSops = table2RSops; + this.table2CorrelatedTSops = table2TSops; + } + + public HashMap> getTable2CorrelatedRSops(){ + return table2CorrelatedRSops; + } + + public HashMap> getTable2CorrelatedTSops(){ + return table2CorrelatedTSops; + } + + private boolean isInvolveSelfJoin = false; + + public boolean isInvolveSelfJoin() { + return isInvolveSelfJoin; + } + + public void setInvolveSelfJoin(boolean isInvolveSelfJoin) { + this.isInvolveSelfJoin = isInvolveSelfJoin; + } + + } + + private class CorrelationNodeProcCtx implements NodeProcessorCtx { + + private final HashSet walked = new HashSet(); + + private final ArrayList correlations = new ArrayList(); + + public void addCorrelation(IntraQueryCorrelation correlation){ + correlations.add(correlation); + } + + public ArrayList getCorrelations(){ + return correlations; + } + + + public boolean isWalked(ReduceSinkOperator op){ + return walked.contains(op); + } + + public void addWalked(ReduceSinkOperator op){ + walked.add(op); + } + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (revision 1172874) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (working copy) @@ -61,6 +61,7 @@ private Reporter rp; private boolean abort = false; private boolean isTagged = false; + private boolean isOperationPathTagged = false; //If operation plan is tagged private long cntr = 0; private long nextCntr = 1; @@ -116,6 +117,7 @@ reducer.setParentOperators(null); // clear out any parents as reducer is the // root isTagged = gWork.getNeedsTagging(); + isOperationPathTagged = gWork.getNeedsOperationPathTagging(); try { keyTableDesc = gWork.getKeyDesc(); inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc @@ -164,8 +166,9 @@ private BytesWritable groupKey; - ArrayList row = new ArrayList(3); + ArrayList row = new ArrayList(4); ByteWritable tag = new ByteWritable(); + ByteWritable operationPathTags = new ByteWritable(); public void reduce(Object key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { @@ -187,6 +190,14 @@ tag.set(keyWritable.get()[size]); keyWritable.setSize(size); } + + operationPathTags.set((byte)0); + if (isOperationPathTagged){ + // remove the operation plan tag + int size = keyWritable.getSize() - 1; + operationPathTags.set(keyWritable.get()[size]); + keyWritable.setSize(size); + } if (!keyWritable.equals(groupKey)) { // If a operator wants to do some work at the beginning of a group @@ -212,6 +223,7 @@ l4j.trace("Start Group"); reducer.startGroup(); reducer.setGroupKeyObject(keyObject); + reducer.setBytesWritableGroupKey(groupKey); } // System.err.print(keyObject.toString()); while (values.hasNext()) { @@ -234,6 +246,7 @@ row.add(valueObject[tag.get()]); // The tag is not used any more, we should remove it. row.add(tag); + row.add(operationPathTags); if (isLogInfoEnabled) { cntr++; if (cntr == nextCntr) { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (revision 1172874) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.OutputCollector; @@ -55,6 +56,7 @@ protected List> childOperators; protected List> parentOperators; + protected int numberOfParentOperators; protected String operatorId; /** * List of counter names associated with the operator. It contains the @@ -345,6 +347,12 @@ } } + if(parentOperators == null){ + numberOfParentOperators = 0; + }else{ + numberOfParentOperators = parentOperators.size(); + } + if (inputObjInspectors.length == 0) { throw new HiveException("Internal Error during operator initialization."); } @@ -354,6 +362,8 @@ //pass the exec context to child operators passExecContext(this.execContext); + numOfClosedParentOperators = 0; + initializeOp(hconf); LOG.info("Initialization Done " + id + " " + getName()); } @@ -526,6 +536,13 @@ return true; } + // the number of parent operators that has been closed + transient int numOfClosedParentOperators; + + public int getNumOfClosedParentOperators(){ + return numOfClosedParentOperators; + } + // This close() function does not need to be synchronized // since it is called by its parents' main thread, so no // more than 1 thread should call this close() function. @@ -537,6 +554,7 @@ // check if all parents are finished if (!allInitializedParentsAreClosed()) { + numOfClosedParentOperators++; return; } @@ -555,6 +573,7 @@ // call the operator specific close routine closeOp(abort); + numOfClosedParentOperators = 0; try { logStats(); @@ -565,7 +584,7 @@ for (Operator op : childOperators) { op.close(abort); } - + LOG.info(id + " Close done"); } catch (HiveException e) { e.printStackTrace(); @@ -1332,4 +1351,17 @@ public void cleanUpInputFileChangedOp() throws HiveException { } + protected BytesWritable bytesWritableGroupKey; + + public void setBytesWritableGroupKey(BytesWritable groupKey) { + if(bytesWritableGroupKey == null){ + bytesWritableGroupKey = new BytesWritable(); + } + bytesWritableGroupKey.set(groupKey.get(), 0, groupKey.getSize()); + } + + public BytesWritable getBytesWritableGroupKey() { + return bytesWritableGroupKey; + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java (revision 0) @@ -0,0 +1,729 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.CorrelationCompositeOperator; +import org.apache.hadoop.hive.ql.exec.CorrelationFakeReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.CorrelationReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.CorrelationOptimizer.IntraQueryCorrelation; +import org.apache.hadoop.hive.ql.parse.OpParseContext; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.RowResolver; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; +import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationDispatchDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationFakeReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc; +import org.apache.hadoop.hive.ql.plan.ForwardDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; + + +public final class CorrelationOptimizerUtils { + + static final private Log LOG = LogFactory.getLog(CorrelationOptimizerUtils.class.getName()); + + public static boolean isExisted(ExprNodeDesc expr, ArrayList col_list){ + for(ExprNodeDesc thisExpr: col_list){ + if(expr.getExprString().equals(thisExpr.getExprString())){ + return true; + } + } + return false; + } + + public static String getColumnName(Map opColumnExprMap, ExprNodeDesc expr) { + for(Entry entry: opColumnExprMap.entrySet()){ + if(expr.getExprString().equals(entry.getValue().getExprString())){ + return entry.getKey(); + } + } + return null; + } + + + public static Operator unionUsedColumnsAndMakeNewSelect(ArrayList rsops, + IntraQueryCorrelation correlation, LinkedHashMap, + Map> originalOpColumnExprMap, TableScanOperator input, ParseContext pGraphContext, + LinkedHashMap, OpParseContext> originalOpParseCtx){ + + ArrayList columnNames = new ArrayList(); + Map colExprMap = new HashMap(); + ArrayList col_list = new ArrayList(); + RowResolver out_rwsch = new RowResolver(); + boolean isSelectAll = false; + + int pos = 0; + for(ReduceSinkOperator rsop: rsops){ + Operator curr = correlation.getBottom2TSops().get(rsop).get(0).getChildOperators().get(0); + while(true){ + if (curr.getName().equals("SEL")){ + SelectOperator selOp = (SelectOperator)curr; + if(selOp.getColumnExprMap() != null){ + for(Entry entry: selOp.getColumnExprMap().entrySet()){ + ExprNodeDesc expr = entry.getValue(); + if(!isExisted(expr, col_list) && originalOpParseCtx.get(selOp).getRowResolver().getInvRslvMap().containsKey(entry.getKey())){ + col_list.add(expr); + String[] colRef = originalOpParseCtx.get(selOp).getRowResolver().getInvRslvMap().get(entry.getKey()); + String tabAlias = colRef[0]; + String colAlias = colRef[1]; + String outputName = entry.getKey(); + out_rwsch.put(tabAlias, colAlias, new ColumnInfo( + outputName, expr.getTypeInfo(), tabAlias, false)); + pos++; + columnNames.add(outputName); + colExprMap.put(outputName, expr); + } + } + }else{ + for(ExprNodeDesc expr: selOp.getConf().getColList()){ + if(!isExisted(expr, col_list)){ + col_list.add(expr); + String[] colRef = pGraphContext.getOpParseCtx().get(selOp).getRowResolver().getInvRslvMap().get(expr.getCols().get(0)); + String tabAlias = colRef[0]; + String colAlias = colRef[1]; + String outputName = expr.getCols().get(0); + out_rwsch.put(tabAlias, colAlias, new ColumnInfo( + outputName, expr.getTypeInfo(), tabAlias, false)); + columnNames.add(outputName); + colExprMap.put(outputName, expr); + pos++; + } + } + } + break; + }else if (curr.getName().equals("FIL")){ + isSelectAll = true; + break; + }else if(curr.getName().equals("RS")){ + ReduceSinkOperator thisRSop = (ReduceSinkOperator)curr; + for(ExprNodeDesc expr: thisRSop.getConf().getKeyCols()){ + if(!isExisted(expr, col_list)){ + col_list.add(expr); + assert expr.getCols().size() == 1; + String columnName = getColumnName(originalOpColumnExprMap.get(thisRSop), expr); + String[] colRef = pGraphContext.getOpParseCtx().get(thisRSop).getRowResolver().getInvRslvMap().get(columnName); + String tabAlias = colRef[0]; + String colAlias = colRef[1]; + String outputName = expr.getCols().get(0); + out_rwsch.put(tabAlias, colAlias, new ColumnInfo( + outputName, expr.getTypeInfo(), tabAlias, false)); + columnNames.add(outputName); + colExprMap.put(outputName, expr); + pos++; + } + } + for(ExprNodeDesc expr: thisRSop.getConf().getValueCols()){ + if(!isExisted(expr, col_list)){ + col_list.add(expr); + assert expr.getCols().size() == 1; + String columnName = getColumnName(originalOpColumnExprMap.get(thisRSop), expr); + String[] colRef = pGraphContext.getOpParseCtx().get(thisRSop).getRowResolver().getInvRslvMap().get(columnName); + String tabAlias = colRef[0]; + String colAlias = colRef[1]; + String outputName = expr.getCols().get(0); + out_rwsch.put(tabAlias, colAlias, new ColumnInfo( + outputName, expr.getTypeInfo(), tabAlias, false)); + columnNames.add(outputName); + colExprMap.put(outputName, expr); + pos++; + } + } + + break; + }else{ + curr = curr.getChildOperators().get(0); + } + } + } + + Operator output; + if (isSelectAll){ + output = input; + }else{ + output = putOpInsertMap(OperatorFactory.getAndMakeChild( + new SelectDesc(col_list, columnNames, false), new RowSchema( + out_rwsch.getColumnInfos()), input), out_rwsch, pGraphContext.getOpParseCtx()); + output.setColumnExprMap(colExprMap); + output.setChildOperators(Utilities.makeList()); + + } + + return output; + } + + + public static Operator putOpInsertMap(Operator op, + RowResolver rr, LinkedHashMap, OpParseContext> opParseCtx) { + OpParseContext ctx = new OpParseContext(rr); + opParseCtx.put(op, ctx); + op.augmentPlan(); + return op; + } + + public static HashMap, String> getAliasIDtTopOps(HashMap> topOps){ + HashMap, String> aliasIDtTopOps = new HashMap, String>(); + for(Entry> entry: topOps.entrySet()){ + assert !aliasIDtTopOps.containsKey(entry.getValue()); + aliasIDtTopOps.put(entry.getValue(), entry.getKey()); + } + return aliasIDtTopOps; + } + + public static ArrayList findPeerReduceSinkOperators(ReduceSinkOperator op){ + + ArrayList peerReduceSinkOperators = new ArrayList(); + + List> children = op.getChildOperators(); + assert children.size() == 1; + + for(Operator parent: children.get(0).getParentOperators()){ + assert (parent instanceof ReduceSinkOperator); + peerReduceSinkOperators.add((ReduceSinkOperator)parent); + } + + return peerReduceSinkOperators; + } + + public static ArrayList findPeerFakeReduceSinkOperators(CorrelationFakeReduceSinkOperator op){ + + ArrayList peerReduceSinkOperators = new ArrayList(); + + List> children = op.getChildOperators(); + assert children.size() == 1; + + for(Operator parent: children.get(0).getParentOperators()){ + assert (parent instanceof ReduceSinkOperator); + peerReduceSinkOperators.add((CorrelationFakeReduceSinkOperator)parent); + } + + return peerReduceSinkOperators; + } + + // find how many layer's of Fake reduce sink + public static int getPostComputationDepth(IntraQueryCorrelation correlation){ + int depth = 0; + for(ReduceSinkOperator rsop: correlation.getBottomReduceSinkOperators()){ + ReduceSinkOperator op = rsop; + int layer = 0; + while(!correlation.getTopReduceSinkOperators().contains(op)){ + assert correlation.getDown2upRSops().get(op).size() == 1; + op = correlation.getDown2upRSops().get(op).get(0); + layer++; + } + if(layer > depth){ + depth = layer; + } + } + assert depth >= 1; + return depth; + } + + + public static int getPostComputationDepthOfThisPlan(ReduceSinkOperator rsop, IntraQueryCorrelation correlation){ + int depth = 0; + ReduceSinkOperator op = rsop; + while(!correlation.getTopReduceSinkOperators().contains(op)){ + assert correlation.getDown2upRSops().get(op).size() == 1; + op = correlation.getDown2upRSops().get(op).get(0); + depth++; + } + assert depth >= 1; + return depth; + } + + public static ParseContext applyCorrelation(IntraQueryCorrelation correlation, ParseContext inputpGraphContext, + LinkedHashMap, Map> originalOpColumnExprMap, + LinkedHashMap, RowResolver> originalOpRowResolver, + Map groupbyRegular2MapSide, + LinkedHashMap, OpParseContext> originalOpParseCtx){ + + ParseContext pGraphContext = inputpGraphContext; + + // 0: if necessary, replace RS-GBY to GBY-RS-GBY. In GBY-RS-GBY, the first GBY is in type of hash, so it can group records + LOG.info("apply correlation step 0: replace RS-GBY to GBY-RS-GBY"); + for(ReduceSinkOperator rsop: correlation.getRSGBYToBeReplacedByGBYRSGBY()){ + LOG.info("operator " + rsop.getIdentifier() + " should be replaced"); + assert !correlation.getBottomReduceSinkOperators().contains(rsop); + GroupByOperator mapSideGBY = groupbyRegular2MapSide.get(rsop); + assert (mapSideGBY.getChildOperators().get(0).getChildOperators().get(0) instanceof GroupByOperator); + ReduceSinkOperator newRsop = (ReduceSinkOperator)mapSideGBY.getChildOperators().get(0); + GroupByOperator reduceSideGBY = (GroupByOperator)newRsop.getChildOperators().get(0); + GroupByOperator oldReduceSideGBY = (GroupByOperator)rsop.getChildOperators().get(0); + List> parents = rsop.getParentOperators(); + List> children = oldReduceSideGBY.getChildOperators(); + mapSideGBY.setParentOperators(parents); + for(Operator parent: parents){ + parent.replaceChild(rsop, mapSideGBY); + } + reduceSideGBY.setChildOperators(children); + for(Operator child: children){ + child.replaceParent(oldReduceSideGBY, reduceSideGBY); + } + correlation.getAllReduceSinkOperators().remove(rsop); + correlation.getAllReduceSinkOperators().add(newRsop); + } + + + Operator curr; + + + // 1: Create table scan operator + LOG.info("apply correlation step 1: create table scan operator"); + HashMap oldTSOP2newTSOP = new HashMap(); + HashMap> oldTopOps = pGraphContext.getTopOps(); + HashMap, String> oldAliasIDtTopOps = getAliasIDtTopOps(oldTopOps); + HashMap oldTopToTable = pGraphContext.getTopToTable(); + HashMap> addedTopOps = new HashMap>(); + HashMap addedTopToTable = new HashMap(); + for(Entry> entry: correlation.getTable2CorrelatedTSops().entrySet()){ + TableScanOperator oldTSop = entry.getValue().get(0); + TableScanDesc tsDesc = new TableScanDesc(oldTSop.getConf().getAlias(), oldTSop.getConf().getVirtualCols()); + OpParseContext opParseCtx= pGraphContext.getOpParseCtx().get(oldTSop); + Operator top = putOpInsertMap(OperatorFactory.get(tsDesc, + new RowSchema(opParseCtx.getRowResolver().getColumnInfos())), + opParseCtx.getRowResolver(), pGraphContext.getOpParseCtx()); + top.setParentOperators(null); + top.setChildOperators(Utilities.makeList()); + for(TableScanOperator tsop: entry.getValue()){ + addedTopOps.put(oldAliasIDtTopOps.get(tsop), top); + addedTopToTable.put((TableScanOperator) top, oldTopToTable.get(tsop)); + oldTSOP2newTSOP.put(tsop, (TableScanOperator)top); + } + } + + int postComputationDepth = getPostComputationDepth(correlation); + ArrayList> childrenOfDispatch = new ArrayList>(); + for(ReduceSinkOperator rsop: correlation.getBottomReduceSinkOperators()){ + int thisPostComputationDepth = getPostComputationDepthOfThisPlan(rsop, correlation); + // TODO: currently, correlation optimizer can not handle the case that + // a table is directly connected to a operator. + assert correlation.getBottomReduceSinkOperators().containsAll(findPeerReduceSinkOperators(rsop)); + Operator op = rsop.getChildOperators().get(0); + if (!childrenOfDispatch.contains(op)){ + LOG.info("Add :" + op.getIdentifier() + " " + op.getName() + " to the children list of dispatch operator"); + childrenOfDispatch.add(op); + } + } + + int opTag = 0; + HashMap operationPath2CorrelationReduceSinkOps = new HashMap(); + for(Entry> entry: correlation.getTable2CorrelatedRSops().entrySet()){ + + // 2: Create select operator for shared op plans + LOG.info("apply correlation step 2: create select operator for shared operation path for the table of " + entry.getKey()); + curr = unionUsedColumnsAndMakeNewSelect(entry.getValue(), correlation, originalOpColumnExprMap, + oldTSOP2newTSOP.get(correlation.getBottom2TSops().get(entry.getValue().get(0)).get(0)), pGraphContext, + originalOpParseCtx); + + // 3: Create CorrelationCompositeOperator, CorrelationReduceSinkOperator + LOG.info("apply correlation step 3: create correlation composite Operator and correlation reduce sink operator for the table of " + entry.getKey()); + curr = createCorrelationCompositeReducesinkOperaotr( + correlation.getTable2CorrelatedTSops().get(entry.getKey()), entry.getValue(), correlation, curr, pGraphContext, + childrenOfDispatch, entry.getKey(), originalOpColumnExprMap, opTag, originalOpRowResolver); + operationPath2CorrelationReduceSinkOps.put(new Integer(opTag), (CorrelationReduceSinkOperator)curr); + opTag++; + } + + + // 4: Create correlation dispatch operator for operation paths + LOG.info("apply correlation step 4: create correlation dispatch operator for operation paths"); + RowResolver outputRS = new RowResolver(); + List> correlationReduceSinkOps = new ArrayList>(); + for(Entry entry: operationPath2CorrelationReduceSinkOps.entrySet()){ + Integer opTagInteger = entry.getKey(); + curr = entry.getValue(); + correlationReduceSinkOps.add((CorrelationReduceSinkOperator)curr); + RowResolver inputRS = pGraphContext.getOpParseCtx().get(curr).getRowResolver(); + for(Entry> e1: inputRS.getRslvMap().entrySet()){ + for(Entry e2: e1.getValue().entrySet()){ + outputRS.put(e1.getKey(), e2.getKey(), e2.getValue()); + } + } + } + + Operator dispatchOp = putOpInsertMap(OperatorFactory.get( + new CorrelationDispatchDesc(correlation.getDispatchConf(), correlation.getDispatchKeySelectDescConf(), correlation.getDispatchValueSelectDescConf()), + new RowSchema(outputRS.getColumnInfos())), + outputRS, pGraphContext.getOpParseCtx()); + + dispatchOp.setParentOperators(correlationReduceSinkOps); + for(Operator thisOp: correlationReduceSinkOps){ + thisOp.setChildOperators(Utilities.makeList(dispatchOp)); + } + + // 5: Replace the old plan in the original plan tree with new plan + LOG.info("apply correlation step 5: Replace the old plan in the original plan tree with the new plan"); + HashSet> processed = new HashSet>(); + for(Operator op: childrenOfDispatch){ + ArrayList> parents = new ArrayList>(); + for(Operator oldParent: op.getParentOperators()){ + if(!correlation.getBottomReduceSinkOperators().contains(oldParent)){ + parents.add(oldParent); + } + } + parents.add(dispatchOp); + op.setParentOperators(parents); + } + dispatchOp.setChildOperators(childrenOfDispatch); + HashMap> newTopOps = new HashMap>(); + for(Entry> entry: oldTopOps.entrySet()){ + if(addedTopOps.containsKey(entry.getKey())){ + newTopOps.put(entry.getKey(), addedTopOps.get(entry.getKey())); + }else{ + newTopOps.put(entry.getKey(), entry.getValue()); + } + } + pGraphContext.setTopOps(newTopOps); + HashMap newTopToTable = new HashMap(); + for(Entry entry: oldTopToTable.entrySet()){ + if(addedTopToTable.containsKey(oldTSOP2newTSOP.get(entry.getKey()))){ + newTopToTable.put(oldTSOP2newTSOP.get(entry.getKey()), + addedTopToTable.get(oldTSOP2newTSOP.get(entry.getKey()))); + }else{ + newTopToTable.put(entry.getKey(), entry.getValue()); + } + } + pGraphContext.setTopToTable(newTopToTable); + + // 6: Change each JFC related ReduceSinkOperator to a CorrelationFakeReduceSinkOperator + LOG.info("apply correlation step 6: Change each JFC related reduce sink operator to a correlation fake reduce sink operator"); + HashMap, ArrayList>> newParentsOfChildren = + new HashMap, ArrayList>>(); + for(ReduceSinkOperator rsop: correlation.getAllReduceSinkOperators()){ + if(!correlation.getBottomReduceSinkOperators().contains(rsop)){ + Operator childOP = rsop.getChildOperators().get(0); + Operator parentOP = rsop.getParentOperators().get(0); + Operator correlationFakeReduceSinkOperator = putOpInsertMap(OperatorFactory.get( + new CorrelationFakeReduceSinkDesc(rsop.getConf()), + new RowSchema(pGraphContext.getOpParseCtx().get(rsop).getRowResolver().getColumnInfos())), + pGraphContext.getOpParseCtx().get(rsop).getRowResolver(), pGraphContext.getOpParseCtx()); + correlationFakeReduceSinkOperator.setChildOperators(Utilities.makeList(childOP)); + correlationFakeReduceSinkOperator.setParentOperators(Utilities.makeList(parentOP)); + parentOP.getChildOperators().set(parentOP.getChildOperators().indexOf(rsop), correlationFakeReduceSinkOperator); + childOP.getParentOperators().set(childOP.getParentOperators().indexOf(rsop), correlationFakeReduceSinkOperator); + } + } + + return pGraphContext; + } + + public static Operator createCorrelationCompositeReducesinkOperaotr( + ArrayList tsops, ArrayList rsops, + IntraQueryCorrelation correlation, + Operator input, ParseContext pGraphContext, + ArrayList> childrenOfDispatch, String tableName, + LinkedHashMap, Map> originalOpColumnExprMap, int newTag, + LinkedHashMap, RowResolver> originalOpRowResolver){ + + // Create CorrelationCompositeOperator + RowResolver inputRR = pGraphContext.getOpParseCtx().get(input).getRowResolver(); + ArrayList> tops = new ArrayList>(); + ArrayList> bottoms = new ArrayList>(); + ArrayList opTags = new ArrayList(); + + for(ReduceSinkOperator rsop: rsops){ + TableScanOperator tsop = correlation.getBottom2TSops().get(rsop).get(0); + Operator curr = tsop.getChildOperators().get(0); + if(curr == rsop){ + // no filter needed, just forward + ForwardDesc forwardCtx = new ForwardDesc(); + Operator forwardOp = OperatorFactory.get(ForwardDesc.class); + forwardOp.setConf(forwardCtx); + tops.add(forwardOp); + bottoms.add(forwardOp); + opTags.add(correlation.getBottomReduceSink2OperationPathMap().get(rsop)); + }else{ + // Add filter operator + FilterOperator currFilOp = null; + while(curr != rsop){ + if (curr.getName().equals("FIL")){ + FilterOperator fil = (FilterOperator)curr; + FilterDesc filterCtx = new FilterDesc(fil.getConf().getPredicate(), false); + Operator nowFilOp = OperatorFactory.get(FilterDesc.class); + nowFilOp.setConf(filterCtx); + if(currFilOp == null){ + currFilOp = (FilterOperator)nowFilOp; + tops.add(currFilOp); + }else{ + nowFilOp.setParentOperators(Utilities.makeList(currFilOp)); + currFilOp.setChildOperators(Utilities.makeList(nowFilOp)); + currFilOp = (FilterOperator) nowFilOp; + } + } + curr = curr.getChildOperators().get(0); + } + if (currFilOp == null){ + ForwardDesc forwardCtx = new ForwardDesc(); + Operator forwardOp = OperatorFactory.get(ForwardDesc.class); + forwardOp.setConf(forwardCtx); + tops.add(forwardOp); + bottoms.add(forwardOp); + } + else{ + bottoms.add(currFilOp); + } + opTags.add(correlation.getBottomReduceSink2OperationPathMap().get(rsop)); + + } + } + + int[] opTagsArray = new int[opTags.size()]; + for(int i=0; i ycop = putOpInsertMap(OperatorFactory.getAndMakeChild(ycoCtx, + new RowSchema(inputRR.getColumnInfos()), input), + inputRR, pGraphContext.getOpParseCtx()); + + // Create CorrelationReduceSinkOperator + ArrayList partitionCols = new ArrayList(); + ArrayList keyCols = new ArrayList(); + Map colExprMap = new HashMap(); + ArrayList keyOutputColumnNames = new ArrayList(); + ReduceSinkOperator firstRsop = rsops.get(0); + + RowResolver firstRsopRS = pGraphContext.getOpParseCtx().get(firstRsop).getRowResolver(); + RowResolver orginalFirstRsopRS = originalOpRowResolver.get(firstRsop); + RowResolver outputRS = new RowResolver(); + HashMap keyCol2ExprForDispatch = new HashMap(); + HashMap valueCol2ExprForDispatch = new HashMap(); + + for(ExprNodeDesc expr: firstRsop.getConf().getKeyCols()){ + assert expr instanceof ExprNodeColumnDesc; + ExprNodeColumnDesc encd = (ExprNodeColumnDesc)expr; + String ouputName = getColumnName(originalOpColumnExprMap.get(firstRsop), expr); + ColumnInfo cinfo = orginalFirstRsopRS.getColumnInfos().get(orginalFirstRsopRS.getPosition(ouputName)); + + String col = SemanticAnalyzer.getColumnInternalName(keyCols.size()); + keyOutputColumnNames.add(col); + ColumnInfo newColInfo = new ColumnInfo(col, cinfo.getType(), tableName, cinfo + .getIsVirtualCol(), cinfo.isHiddenVirtualCol()); + + colExprMap.put(newColInfo.getInternalName(), expr); + + outputRS.put(tableName, newColInfo.getInternalName(), newColInfo); + keyCols.add(expr); + + keyCol2ExprForDispatch.put(encd.getColumn(), new ExprNodeColumnDesc(cinfo.getType(), col, tableName, + encd.getIsPartitionColOrVirtualCol())); + + } + + ArrayList valueCols = new ArrayList(); + ArrayList valueOutputColumnNames = new ArrayList(); + + correlation.addOperationPathToDispatchConf(newTag); + correlation.addOperationPathToDispatchKeySelectDescConf(newTag); + correlation.addOperationPathToDispatchValueSelectDescConf(newTag); + + + for(ReduceSinkOperator rsop: rsops){ + RowResolver rs = pGraphContext.getOpParseCtx().get(rsop).getRowResolver(); + RowResolver orginalRS = originalOpRowResolver.get(rsop); + Integer childOpIndex = childrenOfDispatch.indexOf(rsop.getChildOperators().get(0)); + int outputTag = rsop.getConf().getTag(); + if(outputTag == -1){ + outputTag = 0; + } + if(!correlation.getDispatchConfForOperationPath(newTag).containsKey(childOpIndex)){ + correlation.getDispatchConfForOperationPath(newTag).put(childOpIndex, new ArrayList()); + } + correlation.getDispatchConfForOperationPath(newTag).get(childOpIndex).add(outputTag); + + ArrayList thisKeyColsInDispatch = new ArrayList(); + ArrayList outputKeyNamesInDispatch = new ArrayList(); + for(ExprNodeDesc expr: rsop.getConf().getKeyCols()){ + assert expr instanceof ExprNodeColumnDesc; + ExprNodeColumnDesc encd = (ExprNodeColumnDesc)expr; + String outputName = getColumnName(originalOpColumnExprMap.get(rsop), expr); + thisKeyColsInDispatch.add(keyCol2ExprForDispatch.get(encd.getColumn())); + String[] names = outputName.split("\\."); + outputKeyNamesInDispatch.add(names[1]); + } + + if(!correlation.getDispatchKeySelectDescConfForOperationPath(newTag).containsKey(childOpIndex)){ + correlation.getDispatchKeySelectDescConfForOperationPath(newTag).put(childOpIndex, new ArrayList()); + } + correlation.getDispatchKeySelectDescConfForOperationPath(newTag).get(childOpIndex). + add(new SelectDesc(thisKeyColsInDispatch, outputKeyNamesInDispatch, false)); + + ArrayList thisValueColsInDispatch = new ArrayList(); + ArrayList outputValueNamesInDispatch = new ArrayList(); + for(ExprNodeDesc expr: rsop.getConf().getValueCols()){ + + String outputName = getColumnName(originalOpColumnExprMap.get(rsop), expr); + ColumnInfo cinfo = orginalRS.getColumnInfos().get(orginalRS.getPosition(outputName)); + if(!valueCol2ExprForDispatch.containsKey(expr.getExprString())){ + + String col = SemanticAnalyzer.getColumnInternalName(keyCols.size() + valueCols.size()); + valueOutputColumnNames.add(col); + ColumnInfo newColInfo = new ColumnInfo(col, cinfo.getType(), tableName, cinfo + .getIsVirtualCol(), cinfo.isHiddenVirtualCol()); + colExprMap.put(newColInfo.getInternalName(), expr); + outputRS.put(tableName, newColInfo.getInternalName(), newColInfo); + valueCols.add(expr); + + valueCol2ExprForDispatch.put(expr.getExprString(), new ExprNodeColumnDesc(cinfo.getType(), col, tableName, + false)); + } + + thisValueColsInDispatch.add(valueCol2ExprForDispatch.get(expr.getExprString())); + String[] names = outputName.split("\\."); + outputValueNamesInDispatch.add(names[1]); + } + + if(!correlation.getDispatchValueSelectDescConfForOperationPath(newTag).containsKey(childOpIndex)){ + correlation.getDispatchValueSelectDescConfForOperationPath(newTag).put(childOpIndex, new ArrayList()); + } + correlation.getDispatchValueSelectDescConfForOperationPath(newTag).get(childOpIndex). + add(new SelectDesc(thisValueColsInDispatch, outputValueNamesInDispatch, false)); + } + + CorrelationReduceSinkOperator rsOp = null; + try { + rsOp = (CorrelationReduceSinkOperator) putOpInsertMap( + OperatorFactory.getAndMakeChild(getReduceSinkDesc(keyCols, + keyCols.size(), valueCols, new ArrayList>(), + keyOutputColumnNames, valueOutputColumnNames, true, newTag, keyCols.size(), + -1), new RowSchema(outputRS + .getColumnInfos()), ycop), outputRS, pGraphContext.getOpParseCtx()); + rsOp.setColumnExprMap(colExprMap); + ((CorrelationCompositeOperator)ycop).getConf().setCorrespondingCorrelationReduceSinkOperator(rsOp); + } catch (SemanticException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + return rsOp; + } + + + /** + * Create the correlation reduce sink descriptor. + * + * @param keyCols + * The columns to be stored in the key + * @param numKeys number of distribution keys. Equals to group-by-key + * numbers usually. + * @param valueCols + * The columns to be stored in the value + * @param distinctColIndices + * column indices for distinct aggregates + * @param outputKeyColumnNames + * The output key columns names + * @param outputValueColumnNames + * The output value columns names + * @param tag + * The tag for this reducesink + * @param numPartitionFields + * The first numPartitionFields of keyCols will be partition columns. + * If numPartitionFields=-1, then partition randomly. + * @param numReducers + * The number of reducers, set to -1 for automatic inference based on + * input data size. + * @return The YSmartReduceSinkDesc object. + */ + public static CorrelationReduceSinkDesc getReduceSinkDesc( + ArrayList keyCols, int numKeys, + ArrayList valueCols, + List> distinctColIndices, + List outputKeyColumnNames, List outputValueColumnNames, + boolean includeKey, int tag, + int numPartitionFields, int numReducers) throws SemanticException { + ArrayList partitionCols = null; + + if (numPartitionFields >= keyCols.size()) { + partitionCols = keyCols; + } else if (numPartitionFields >= 0) { + partitionCols = new ArrayList(numPartitionFields); + for (int i = 0; i < numPartitionFields; i++) { + partitionCols.add(keyCols.get(i)); + } + } else { + // numPartitionFields = -1 means random partitioning + partitionCols = new ArrayList(1); + partitionCols.add(TypeCheckProcFactory.DefaultExprProcessor + .getFuncExprNodeDesc("rand")); + } + + StringBuilder order = new StringBuilder(); + for (int i = 0; i < keyCols.size(); i++) { + order.append("+"); + } + + TableDesc keyTable = null; + TableDesc valueTable = null; + ArrayList outputKeyCols = new ArrayList(); + ArrayList outputValCols = new ArrayList(); + if (includeKey) { + keyTable = PlanUtils.getReduceKeyTableDesc(PlanUtils.getFieldSchemasFromColumnListWithLength( + keyCols, distinctColIndices, outputKeyColumnNames, numKeys, ""), + order.toString()); + outputKeyCols.addAll(outputKeyColumnNames); + } else { + keyTable = PlanUtils.getReduceKeyTableDesc(PlanUtils.getFieldSchemasFromColumnList( + keyCols, "reducesinkkey"),order.toString()); + for (int i = 0; i < keyCols.size(); i++) { + outputKeyCols.add("reducesinkkey" + i); + } + } + valueTable = PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromColumnList( + valueCols, outputValueColumnNames, 0, "")); + outputValCols.addAll(outputValueColumnNames); + + return new CorrelationReduceSinkDesc(keyCols, numKeys, valueCols, outputKeyCols, + distinctColIndices, outputValCols, + tag, partitionCols, numReducers, keyTable, + valueTable); + } + +} Index: ql/src/test/results/clientpositive/show_functions.q.out =================================================================== --- ql/src/test/results/clientpositive/show_functions.q.out (revision 1172874) +++ ql/src/test/results/clientpositive/show_functions.q.out (working copy) @@ -186,6 +186,7 @@ corr cos count +count_distinct covar_pop covar_samp create_union Index: ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java =================================================================== --- ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java (revision 1172874) +++ ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java (working copy) @@ -28,7 +28,12 @@ LATERALVIEWJOIN(14), LATERALVIEWFORWARD(15), HASHTABLESINK(16), - HASHTABLEDUMMY(17); + HASHTABLEDUMMY(17), + CORRELATIONCOMPOSITE(18), + CORRELATIONREDUCESINK(19), + CORRELATIONMANUALFORWARD(20), + CORRELATIONDISPATCH(21), + CORRELATIONFAKEREDUCESINK(22); private final int value; @@ -47,7 +52,7 @@ * Find a the enum type by its integer value, as defined in the Thrift IDL. * @return null if the value is not found. */ - public static OperatorType findByValue(int value) { + public static OperatorType findByValue(int value) { switch (value) { case 0: return JOIN; @@ -85,6 +90,16 @@ return HASHTABLESINK; case 17: return HASHTABLEDUMMY; + case 18: + return CORRELATIONCOMPOSITE; + case 19: + return CORRELATIONREDUCESINK; + case 20: + return CORRELATIONMANUALFORWARD; + case 21: + return CORRELATIONDISPATCH; + case 22: + return CORRELATIONFAKEREDUCESINK; default: return null; } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (revision 1172874) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (working copy) @@ -42,6 +42,11 @@ import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.UDTFDesc; import org.apache.hadoop.hive.ql.plan.UnionDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationDispatchDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationFakeReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationManualForwardDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationReduceSinkDesc; /** * OperatorFactory. @@ -91,6 +96,16 @@ HashTableDummyOperator.class)); opvec.add(new OpTuple(HashTableSinkDesc.class, HashTableSinkOperator.class)); + opvec.add(new OpTuple(CorrelationCompositeDesc.class, + CorrelationCompositeOperator.class)); + opvec.add(new OpTuple(CorrelationManualForwardDesc.class, + CorrelationManualForwardOperator.class)); + opvec.add(new OpTuple(CorrelationReduceSinkDesc.class, + CorrelationReduceSinkOperator.class)); + opvec.add(new OpTuple(CorrelationDispatchDesc.class, + CorrelationDispatchOperator.class)); + opvec.add(new OpTuple(CorrelationFakeReduceSinkDesc.class, + CorrelationFakeReduceSinkOperator.class)); } public static Operator get(Class opClass) { Index: ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationManualForwardDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationManualForwardDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationManualForwardDesc.java (revision 0) @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; + +/** + * Correlation Manual ForwardDesc. + * + */ +@Explain(displayName = "Correlation Manual Forward") +public class CorrelationManualForwardDesc implements Serializable { + private static final long serialVersionUID = 1L; + + public CorrelationManualForwardDesc() { + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (revision 1172874) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (working copy) @@ -44,7 +44,12 @@ * @param hiveConf */ public void initialize(HiveConf hiveConf) { + CorrelationOptimizer correlationOptimizer = new CorrelationOptimizer(); transformations = new ArrayList(); + // Add correlation optimizer for first phase query plan tree analysis + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION)){ + transformations.add(correlationOptimizer); + } // Add the transformation that computes the lineage information. transformations.add(new Generator()); if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCP)) { @@ -74,6 +79,10 @@ if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) { transformations.add(new ReduceSinkDeDuplication()); } + // Correlation discovery and query plan tree transformation phase + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION)){ + transformations.add(correlationOptimizer); + } } /** Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCountDistinct.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCountDistinct.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCountDistinct.java (revision 0) @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.udf.generic; + +import java.util.Arrays; +import java.util.HashSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.LongWritable; + +//TODO: Need test +/** + * This class implements the COUNT_DISTINCT aggregation function as in SQL. + */ +@Description(name = "count_distinct", + value = "_FUNC_(*) - Returns the number of rows for " + + "which the supplied expression(s) are unique and non-NULL.") + +public class GenericUDAFCountDistinct implements GenericUDAFResolver2 { + + private static final Log LOG = LogFactory.getLog(GenericUDAFCountDistinct.class.getName()); + + @Override + public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) + throws SemanticException { + // This method implementation is preserved for backward compatibility. + return new GenericUDAFCountEvaluator(); + } + + @Override + public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo paramInfo) + throws SemanticException { + + TypeInfo[] parameters = paramInfo.getParameters(); + + if (parameters.length == 0) { + if (!paramInfo.isAllColumns()) { + throw new UDFArgumentException("Argument expected"); + } + assert !paramInfo.isDistinct() : "DISTINCT not supported with *"; + } else { + if (parameters.length > 1 && !paramInfo.isDistinct()) { + throw new UDFArgumentException("DISTINCT keyword must be specified"); + } + assert !paramInfo.isAllColumns() : "* not supported in expression list"; + } + + return new GenericUDAFCountEvaluator().setCountAllColumns( + paramInfo.isAllColumns()); + } + + /** + * GenericUDAFCountEvaluator. + * + */ + public static class GenericUDAFCountEvaluator extends GenericUDAFEvaluator { + private boolean countAllColumns = false; + private LongObjectInspector partialCountAggOI; + private LongWritable result; + + private ObjectInspector[] inputOI; + + @Override + public ObjectInspector init(Mode m, ObjectInspector[] parameters) + throws HiveException { + super.init(m, parameters); + inputOI = new ObjectInspector[parameters.length]; + for(int i=0; i bufferSet = new HashSet(); + } + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + CountDistinctAgg buffer = new CountDistinctAgg(); + reset(buffer); + return buffer; + } + + @Override + public void reset(AggregationBuffer agg) throws HiveException { + ((CountDistinctAgg) agg).value = 0; + ((CountDistinctAgg) agg).bufferSet.clear(); + } + + + @Override + public void iterate(AggregationBuffer agg, Object[] parameters) + throws HiveException { + // parameters == null means the input table/split is empty + if (parameters == null) { + return; + } + if (countAllColumns) { + throw new UDFArgumentException("count_distinct cannot count all columns"); + } else { + assert parameters.length > 0; + boolean countThisRow = true; + Object[] objs = new Object[parameters.length]; + for(int i=0; i + implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * The evaluators for the key columns. Key columns decide the sort order on + * the reducer side. Key columns are passed to the reducer in the "key". + */ + protected transient ExprNodeEvaluator[] keyEval; + /** + * The evaluators for the value columns. Value columns are passed to reducer + * in the "value". + */ + protected transient ExprNodeEvaluator[] valueEval; + /** + * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in + * Hive language). Partition columns decide the reducer that the current row + * goes to. Partition columns are not passed to reducer. + */ + protected transient ExprNodeEvaluator[] partitionEval; + + // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is + // ready + transient Serializer keySerializer; + transient boolean keyIsText; + transient Serializer valueSerializer; + transient int tag; + transient byte[] tagByte = new byte[1]; + transient protected int numDistributionKeys; + transient protected int numDistinctExprs; + private final ArrayList operationPathTags = new ArrayList(); // operation path tags + private final byte[] operationPathTagsByte = new byte[1]; + + public void setOperationPathTags(ArrayList operationPathTags){ + this.operationPathTags.addAll(operationPathTags); + int operationPathTagsInt = 0; + int tmp = 1; + for (Integer operationPathTag: operationPathTags){ + operationPathTagsInt += tmp << operationPathTag.intValue(); + } + operationPathTagsByte[0] = (byte) operationPathTagsInt; + } + + public ArrayList getOperationPathTags(){ + return this.operationPathTags; + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + + try { + keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()]; + int i = 0; + for (ExprNodeDesc e : conf.getKeyCols()) { + keyEval[i++] = ExprNodeEvaluatorFactory.get(e); + } + + numDistributionKeys = conf.getNumDistributionKeys(); + distinctColIndices = conf.getDistinctColumnIndices(); + numDistinctExprs = distinctColIndices.size(); + + valueEval = new ExprNodeEvaluator[conf.getValueCols().size()]; + i = 0; + for (ExprNodeDesc e : conf.getValueCols()) { + valueEval[i++] = ExprNodeEvaluatorFactory.get(e); + } + + partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()]; + i = 0; + for (ExprNodeDesc e : conf.getPartitionCols()) { + partitionEval[i++] = ExprNodeEvaluatorFactory.get(e); + } + + tag = conf.getTag(); + tagByte[0] = (byte) tag; + LOG.info("Using tag = " + tag); + + TableDesc keyTableDesc = conf.getKeySerializeInfo(); + keySerializer = (Serializer) keyTableDesc.getDeserializerClass() + .newInstance(); + keySerializer.initialize(null, keyTableDesc.getProperties()); + keyIsText = keySerializer.getSerializedClass().equals(Text.class); + + TableDesc valueTableDesc = conf.getValueSerializeInfo(); + valueSerializer = (Serializer) valueTableDesc.getDeserializerClass() + .newInstance(); + valueSerializer.initialize(null, valueTableDesc.getProperties()); + + firstRow = true; + + if (conf.getStaticOperationPathTags() != null){ + setOperationPathTags(conf.getStaticOperationPathTags()); + } + + initializeChildren(hconf); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + transient InspectableObject tempInspectableObject = new InspectableObject(); + transient HiveKey keyWritable = new HiveKey(); + transient Writable value; + + transient StructObjectInspector keyObjectInspector; + transient StructObjectInspector valueObjectInspector; + transient ObjectInspector[] partitionObjectInspectors; + + transient Object[][] cachedKeys; + transient Object[] cachedValues; + transient List> distinctColIndices; + + boolean firstRow; + + transient Random random; + + /** + * Initializes array of ExprNodeEvaluator. Adds Union field for distinct + * column indices for group by. + * Puts the return values into a StructObjectInspector with output column + * names. + * + * If distinctColIndices is empty, the object inspector is same as + * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)} + */ + protected static StructObjectInspector initEvaluatorsAndReturnStruct( + ExprNodeEvaluator[] evals, List> distinctColIndices, + List outputColNames, + int length, ObjectInspector rowInspector) + throws HiveException { + int inspectorLen = evals.length > length ? length + 1 : evals.length; + List sois = new ArrayList(inspectorLen); + + // keys + ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, 0, length, rowInspector); + sois.addAll(Arrays.asList(fieldObjectInspectors)); + + if (evals.length > length) { + // union keys + List uois = new ArrayList(); + for (List distinctCols : distinctColIndices) { + List names = new ArrayList(); + List eois = new ArrayList(); + int numExprs = 0; + for (int i : distinctCols) { + names.add(HiveConf.getColumnInternalName(numExprs)); + eois.add(evals[i].initialize(rowInspector)); + numExprs++; + } + uois.add(ObjectInspectorFactory.getStandardStructObjectInspector(names, eois)); + } + UnionObjectInspector uoi = + ObjectInspectorFactory.getStandardUnionObjectInspector(uois); + sois.add(uoi); + } + return ObjectInspectorFactory.getStandardStructObjectInspector(outputColNames, sois ); + } + + @Override + public void processOp(Object row, int tag) throws HiveException { + try { + ObjectInspector rowInspector = inputObjInspectors[tag]; + if (firstRow) { + firstRow = false; + keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval, + distinctColIndices, + conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector); + valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, conf + .getOutputValueColumnNames(), rowInspector); + partitionObjectInspectors = initEvaluators(partitionEval, rowInspector); + int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1; + int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 : + numDistributionKeys; + cachedKeys = new Object[numKeys][keyLen]; + cachedValues = new Object[valueEval.length]; + } + + // Evaluate the HashCode + int keyHashCode = 0; + if (partitionEval.length == 0) { + // If no partition cols, just distribute the data uniformly to provide + // better + // load balance. If the requirement is to have a single reducer, we + // should set + // the number of reducers to 1. + // Use a constant seed to make the code deterministic. + if (random == null) { + random = new Random(12345); + } + keyHashCode = random.nextInt(); + } else { + for (int i = 0; i < partitionEval.length; i++) { + Object o = partitionEval[i].evaluate(row); + keyHashCode = keyHashCode * 31 + + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]); + } + } + + // Evaluate the value + for (int i = 0; i < valueEval.length; i++) { + cachedValues[i] = valueEval[i].evaluate(row); + } + // Serialize the value + value = valueSerializer.serialize(cachedValues, valueObjectInspector); + + // Evaluate the keys + Object[] distributionKeys = new Object[numDistributionKeys]; + for (int i = 0; i < numDistributionKeys; i++) { + distributionKeys[i] = keyEval[i].evaluate(row); + } + + if (numDistinctExprs > 0) { + // with distinct key(s) + for (int i = 0; i < numDistinctExprs; i++) { + System.arraycopy(distributionKeys, 0, cachedKeys[i], 0, numDistributionKeys); + Object[] distinctParameters = + new Object[distinctColIndices.get(i).size()]; + for (int j = 0; j < distinctParameters.length; j++) { + distinctParameters[j] = + keyEval[distinctColIndices.get(i).get(j)].evaluate(row); + } + cachedKeys[i][numDistributionKeys] = + new StandardUnion((byte)i, distinctParameters); + } + } else { + // no distinct key + System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys); + } + // Serialize the keys and append the tag + for (int i = 0; i < cachedKeys.length; i++) { + if (keyIsText) { + Text key = (Text) keySerializer.serialize(cachedKeys[i], + keyObjectInspector); + if (tag == -1) { + keyWritable.set(key.getBytes(), 0, key.getLength()); + } else { + int keyLength = key.getLength(); + keyWritable.setSize(keyLength + 2); + System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength); + keyWritable.get()[keyLength] = operationPathTagsByte[0]; + keyWritable.get()[keyLength + 1] = tagByte[0]; + } + } else { + // Must be BytesWritable + BytesWritable key = (BytesWritable) keySerializer.serialize( + cachedKeys[i], keyObjectInspector); + if (tag == -1) { + keyWritable.set(key.getBytes(), 0, key.getLength()); + } else { + int keyLength = key.getLength(); + keyWritable.setSize(keyLength + 2); + System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength); + keyWritable.get()[keyLength] = operationPathTagsByte[0]; + keyWritable.get()[keyLength + 1] = tagByte[0]; + } + } + keyWritable.setHashCode(keyHashCode); + if (out != null) { + out.collect(keyWritable, value); + // Since this is a terminal operator, update counters explicitly - + // forward is not called + if (counterNameToEnum != null) { + ++outputRows; + if (outputRows % 1000 == 0) { + incrCounter(numOutputRowsCntr, outputRows); + outputRows = 0; + } + } + } + } + } catch (SerDeException e) { + throw new HiveException(e); + } catch (IOException e) { + throw new HiveException(e); + } + } + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return new String("CRS"); + } + + @Override + public OperatorType getType() { + return OperatorType.CORRELATIONREDUCESINK; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (revision 1172874) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (working copy) @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.CorrelationReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; @@ -58,6 +59,7 @@ import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.CorrelationReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; @@ -114,6 +116,7 @@ } if (reducer.getClass() == JoinOperator.class) { plan.setNeedsTagging(true); + plan.setNeedsOperationPathTagging(false); } assert currTopOp != null; @@ -133,6 +136,58 @@ opProcCtx.setCurrAliasId(currAliasId); } + /** + * Initialize the current plan by adding it to root tasks. + * + * @param op + * the reduce sink operator encountered + * @param opProcCtx + * processing context + */ + public static void initPlan(CorrelationReduceSinkOperator op, GenMRProcContext opProcCtx) + throws SemanticException { + Operator reducer = op.getChildOperators().get(0); + Map, GenMapRedCtx> mapCurrCtx = opProcCtx.getMapCurrCtx(); + GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0)); + Task currTask = mapredCtx.getCurrTask(); + MapredWork plan = (MapredWork) currTask.getWork(); + HashMap, Task> opTaskMap = + opProcCtx.getOpTaskMap(); + Operator currTopOp = opProcCtx.getCurrTopOp(); + + opTaskMap.put(reducer, currTask); + plan.setReducer(reducer); + CorrelationReduceSinkDesc desc = op.getConf(); + + plan.setNumReduceTasks(desc.getNumReducers()); + + List> rootTasks = opProcCtx.getRootTasks(); + + if (!rootTasks.contains(currTask)) { + rootTasks.add(currTask); + } + + plan.setNeedsTagging(true); + plan.setNeedsOperationPathTagging(true); + + + assert currTopOp != null; + List> seenOps = opProcCtx.getSeenOps(); + String currAliasId = opProcCtx.getCurrAliasId(); + + if (!seenOps.contains(currTopOp)) { + seenOps.add(currTopOp); + setTaskPlan(currAliasId, currTopOp, plan, false, opProcCtx); + } + + currTopOp = null; + currAliasId = null; + + opProcCtx.setCurrTask(currTask); + opProcCtx.setCurrTopOp(currTopOp); + opProcCtx.setCurrAliasId(currAliasId); + } + public static void initMapJoinPlan( Operator op, GenMRProcContext ctx, boolean readInputMapJoin, boolean readInputUnion, boolean setReducer, int pos) throws SemanticException { @@ -178,6 +233,7 @@ opTaskMap.put(reducer, currTask); if (reducer.getClass() == JoinOperator.class) { plan.setNeedsTagging(true); + plan.setNeedsOperationPathTagging(false); } ReduceSinkDesc desc = (ReduceSinkDesc) op.getConf(); plan.setNumReduceTasks(desc.getNumReducers()); @@ -308,6 +364,7 @@ if (reducer.getClass() == JoinOperator.class) { plan.setNeedsTagging(true); + plan.setNeedsOperationPathTagging(false); } initUnionPlan(opProcCtx, currTask, false); @@ -495,6 +552,38 @@ } /** + * Split the current plan by creating a temporary destination. + * + * @param op + * the reduce sink operator encountered + * @param opProcCtx + * processing context + */ + public static void splitPlan(CorrelationReduceSinkOperator op, GenMRProcContext opProcCtx) + throws SemanticException { + // Generate a new task + ParseContext parseCtx = opProcCtx.getParseCtx(); + MapredWork cplan = getMapRedWork(parseCtx); + Task redTask = TaskFactory.get(cplan, parseCtx + .getConf()); + Operator reducer = op.getChildOperators().get(0); + + // Add the reducer + cplan.setReducer(reducer); + CorrelationReduceSinkDesc desc = op.getConf(); + + cplan.setNumReduceTasks(new Integer(desc.getNumReducers())); + + HashMap, Task> opTaskMap = + opProcCtx.getOpTaskMap(); + opTaskMap.put(reducer, redTask); + Task currTask = opProcCtx.getCurrTask(); + + splitTasks(op, currTask, redTask, opProcCtx, true, false, 0); + opProcCtx.getRootOps().add(op); + } + + /** * set the current task in the mapredWork. * * @param alias_id @@ -823,7 +912,16 @@ tagToSchema.add(null); } tagToSchema.set(tag, rs.getConf().getValueSerializeInfo()); - } else { + } else if (topOp instanceof CorrelationReduceSinkOperator) { + CorrelationReduceSinkOperator rs = (CorrelationReduceSinkOperator) topOp; + plan.setKeyDesc(rs.getConf().getKeySerializeInfo()); + int tag = Math.max(0, rs.getConf().getTag()); + List tagToSchema = plan.getTagToValueDesc(); + while (tag + 1 > tagToSchema.size()) { + tagToSchema.add(null); + } + tagToSchema.set(tag, rs.getConf().getValueSerializeInfo()); + }else { List> children = topOp.getChildOperators(); if (children != null) { for (Operator op : children) { @@ -983,6 +1081,7 @@ // dependent on the redTask if (reducer.getClass() == JoinOperator.class) { cplan.setNeedsTagging(true); + cplan.setNeedsOperationPathTagging(false); } } Index: ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (revision 1172874) +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (working copy) @@ -279,6 +279,7 @@ private void populateMapRedPlan3(Table src, Table src2) throws SemanticException { mr.setNumReduceTasks(Integer.valueOf(5)); mr.setNeedsTagging(true); + mr.setNeedsOperationPathTagging(false); ArrayList outputColumns = new ArrayList(); for (int i = 0; i < 2; i++) { outputColumns.add("_col" + i); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java (revision 0) @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.io.LongWritable; + +/** + * Correlation composite operator implementation. + **/ +public class CorrelationCompositeOperator extends Operator implements Serializable{ + + static final private Log LOG = LogFactory.getLog(Driver.class.getName()); + static final private LogHelper console = new LogHelper(LOG); + + public static enum Counter { + FILTERED, PASSED + } + + /** + * + */ + private static final long serialVersionUID = 1L; + + private ArrayList> bottomInternalOperators; + private ArrayList correlationManualForwardOperators; + + private CorrelationReduceSinkOperator correspondingCorrelationReduceSinkOperators; + + private transient final LongWritable filtered_count, passed_count; + + + public CorrelationCompositeOperator(){ + super(); + filtered_count = new LongWritable(); + passed_count = new LongWritable(); + } + + @Override + public OperatorType getType() { + return OperatorType.CORRELATIONCOMPOSITE; + } + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return new String("CCO"); + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + correspondingCorrelationReduceSinkOperators = conf.getCorrespondingCorrelationReduceSinkOperator(); + bottomInternalOperators = conf.getBottomInternalOperators(); + correlationManualForwardOperators = conf.getCorrelationManualForwardOperators(); + allOperationPathTags = conf.getAllOperationPathTags(); + statsMap.put(Counter.FILTERED, filtered_count); + statsMap.put(Counter.PASSED, passed_count); + // initialize internal operators + for (Operator op:bottomInternalOperators){ + op.initialize(hconf, inputObjInspectors); + } + //initialize its children + initializeChildren(hconf); + } + + private int[] allOperationPathTags; + + @Override + public void processOp(Object row, int tag) throws HiveException { + ArrayList operationPathTags = new ArrayList(); + boolean isForward = false; + for (Operator op: bottomInternalOperators){ + op.process(row, tag); + } + ArrayList rows = new ArrayList(); + int[] tags = new int[bottomInternalOperators.size()]; + int i = 0; + for (CorrelationManualForwardOperator ymf: correlationManualForwardOperators){ + rows.add(ymf.getRow()); + tags[i] = ymf.getTag(); + i++; + } + if (rows.size() > 0){ + i = 0; + for (Object r: rows){ + if (r != null){ + operationPathTags.add(allOperationPathTags[i]); + isForward = true; + } + i++; + } + assert correspondingCorrelationReduceSinkOperators != null; + correspondingCorrelationReduceSinkOperators.setOperationPathTags(operationPathTags); + } + if (isForward){ + passed_count.set(passed_count.get() + 1); + forward(row, inputObjInspectors[tag]); + } + else{ + filtered_count.set(filtered_count.get() + 1); + } + for (CorrelationManualForwardOperator ymf: correlationManualForwardOperators){ + ymf.clean(); + } + + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (revision 1172874) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (working copy) @@ -192,6 +192,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDTFJSONTuple; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTFParseUrlTuple; import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCountDistinct; import org.apache.hadoop.hive.ql.udf.xml.GenericUDFXPath; import org.apache.hadoop.hive.ql.udf.xml.UDFXPathBoolean; import org.apache.hadoop.hive.ql.udf.xml.UDFXPathDouble; @@ -381,6 +382,7 @@ registerGenericUDAF("sum", new GenericUDAFSum()); registerGenericUDAF("count", new GenericUDAFCount()); + registerGenericUDAF("count_distinct", new GenericUDAFCountDistinct()); registerGenericUDAF("avg", new GenericUDAFAverage()); registerGenericUDAF("std", new GenericUDAFStd()); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationFakeReduceSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationFakeReduceSinkOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationFakeReduceSinkOperator.java (revision 0) @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CorrelationFakeReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.Serializer; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * Fake Reduce Sink Operator sends output to another operator (e.g. JOIN or GBY). + **/ +public class CorrelationFakeReduceSinkOperator extends Operator + implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * The evaluators for the key columns. Key columns decide the sort order on + * the reducer side. Key columns are passed to the reducer in the "key". + */ + protected transient ExprNodeEvaluator[] keyEval; + /** + * The evaluators for the value columns. Value columns are passed to reducer + * in the "value". + */ + protected transient ExprNodeEvaluator[] valueEval; + /** + * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in + * Hive language). Partition columns decide the reducer that the current row + * goes to. Partition columns are not passed to reducer. + */ + + // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is + // ready + transient Serializer keySerializer; + transient boolean keyIsText; + transient Serializer valueSerializer; + + transient TableDesc keyTableDesc; + transient TableDesc valueTableDesc; + + transient Deserializer inputKeyDeserializer; + + transient SerDe inputValueDeserializer; + + transient int tag; + transient byte[] tagByte; + transient ByteWritable tagWritable; + transient protected int numDistributionKeys; + transient protected int numDistinctExprs; + + transient InspectableObject tempInspectableObject; + transient HiveKey keyWritable; + transient Writable value; + + transient StructObjectInspector keyObjectInspector; + transient StructObjectInspector valueObjectInspector; + transient ObjectInspector[] partitionObjectInspectors; + + transient ObjectInspector outputKeyObjectInspector; + transient ObjectInspector outputValueObjectInspector; + transient ObjectInspector[] outputPartitionObjectInspectors; + + + //transient BytesWritable groupKey; + transient Object[][] cachedKeys; + transient Object[] cachedValues; + transient List> distinctColIndices; + + transient Random random; + + private ArrayList forwardedRow; + private Object keyObject; + private Object valueObject; + + private static String[] fieldNames; + + static { + ArrayList fieldNameArray = new ArrayList(); + for (Utilities.ReduceField r : Utilities.ReduceField.values()) { + fieldNameArray.add(r.toString()); + } + fieldNames = fieldNameArray.toArray(new String[0]); + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + forwardedRow = new ArrayList(3); + tagByte = new byte[1]; + tagWritable = new ByteWritable(); + tempInspectableObject = new InspectableObject(); + keyWritable = new HiveKey(); + assert childOperatorsArray.length == 1; + try { + keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()]; + int i = 0; + for (ExprNodeDesc e : conf.getKeyCols()) { + keyEval[i++] = ExprNodeEvaluatorFactory.get(e); + } + + numDistributionKeys = conf.getNumDistributionKeys(); + distinctColIndices = conf.getDistinctColumnIndices(); + numDistinctExprs = distinctColIndices.size(); + + valueEval = new ExprNodeEvaluator[conf.getValueCols().size()]; + i = 0; + for (ExprNodeDesc e : conf.getValueCols()) { + valueEval[i++] = ExprNodeEvaluatorFactory.get(e); + } + + tag = conf.getTag(); + tagByte[0] = (byte) tag; + tagWritable.set(tagByte[0]); + LOG.info("Using tag = " + tag); + + TableDesc keyTableDesc = conf.getKeySerializeInfo(); + keySerializer = (Serializer) keyTableDesc.getDeserializerClass() + .newInstance(); + keySerializer.initialize(null, keyTableDesc.getProperties()); + keyIsText = keySerializer.getSerializedClass().equals(Text.class); + + inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc + .getDeserializerClass(), null); + inputKeyDeserializer.initialize(null, keyTableDesc.getProperties()); + outputKeyObjectInspector = inputKeyDeserializer.getObjectInspector(); + + TableDesc valueTableDesc = conf.getValueSerializeInfo(); + valueSerializer = (Serializer) valueTableDesc.getDeserializerClass() + .newInstance(); + valueSerializer.initialize(null, valueTableDesc.getProperties()); + + inputValueDeserializer = (SerDe) ReflectionUtils.newInstance( + valueTableDesc.getDeserializerClass(), null); + inputValueDeserializer.initialize(null, valueTableDesc + .getProperties()); + outputValueObjectInspector = inputValueDeserializer.getObjectInspector(); + + ObjectInspector rowInspector = inputObjInspectors[0]; + + keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval, + distinctColIndices, + conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector); + valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, conf + .getOutputValueColumnNames(), rowInspector); + int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1; + int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 : + numDistributionKeys; + cachedKeys = new Object[numKeys][keyLen]; + cachedValues = new Object[valueEval.length]; + assert cachedKeys.length == 1; + + ArrayList ois = new ArrayList(); + ois.add(outputKeyObjectInspector); + ois.add(outputValueObjectInspector); + ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector); + + outputObjInspector = ObjectInspectorFactory + .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois); + + LOG.info("Fake ReduceSink inputObjInspectors" + + ((StructObjectInspector) inputObjInspectors[0]).getTypeName()); //Yin + + LOG.info("Fake ReduceSink outputObjInspectors " + + this.getChildOperators().get(0).getParentOperators().indexOf(this) + + " " + ((StructObjectInspector) outputObjInspector).getTypeName()); //Yin + + initializeChildren(hconf); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + /** + * Initializes array of ExprNodeEvaluator. Adds Union field for distinct + * column indices for group by. + * Puts the return values into a StructObjectInspector with output column + * names. + * + * If distinctColIndices is empty, the object inspector is same as + * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)} + */ + protected static StructObjectInspector initEvaluatorsAndReturnStruct( + ExprNodeEvaluator[] evals, List> distinctColIndices, + List outputColNames, + int length, ObjectInspector rowInspector) + throws HiveException { + int inspectorLen = evals.length > length ? length + 1 : evals.length; + List sois = new ArrayList(inspectorLen); + + // keys + ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, 0, length, rowInspector); + sois.addAll(Arrays.asList(fieldObjectInspectors)); + + if (evals.length > length) { + // union keys + List uois = new ArrayList(); + for (List distinctCols : distinctColIndices) { + List names = new ArrayList(); + List eois = new ArrayList(); + int numExprs = 0; + for (int i : distinctCols) { + names.add(HiveConf.getColumnInternalName(numExprs)); + eois.add(evals[i].initialize(rowInspector)); + numExprs++; + } + uois.add(ObjectInspectorFactory.getStandardStructObjectInspector(names, eois)); + } + UnionObjectInspector uoi = + ObjectInspectorFactory.getStandardUnionObjectInspector(uois); + sois.add(uoi); + } + return ObjectInspectorFactory.getStandardStructObjectInspector(outputColNames, sois ); + } + + private BytesWritable groupKey; + + @Override + public void processOp(Object row, int tag) throws HiveException { + + try { + // Evaluate the value + for (int i = 0; i < valueEval.length; i++) { + cachedValues[i] = valueEval[i].evaluate(row); + } + // Serialize the value + + value = valueSerializer.serialize(cachedValues, valueObjectInspector); + + valueObject = inputValueDeserializer.deserialize(value); + + // Evaluate the keys + Object[] distributionKeys = new Object[numDistributionKeys]; + for (int i = 0; i < numDistributionKeys; i++) { + distributionKeys[i] = keyEval[i].evaluate(row); + } + + if (numDistinctExprs > 0) { + // with distinct key(s) + for (int i = 0; i < numDistinctExprs; i++) { + System.arraycopy(distributionKeys, 0, cachedKeys[i], 0, numDistributionKeys); + Object[] distinctParameters = + new Object[distinctColIndices.get(i).size()]; + for (int j = 0; j < distinctParameters.length; j++) { + distinctParameters[j] = + keyEval[distinctColIndices.get(i).get(j)].evaluate(row); + } + cachedKeys[i][numDistributionKeys] = + new StandardUnion((byte)i, distinctParameters); + } + } else { + // no distinct key + System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys); + } + + + for (int i = 0; i < cachedKeys.length; i++) { + + if (keyIsText) { + Text key = (Text) keySerializer.serialize(cachedKeys[i], + keyObjectInspector); + keyWritable.set(key.getBytes(), 0, key.getLength()); + + } else { + // Must be BytesWritable + BytesWritable key = (BytesWritable) keySerializer.serialize( + cachedKeys[i], keyObjectInspector); + keyWritable.set(key.getBytes(), 0, key.getLength()); + + } + + if (!keyWritable.equals(groupKey)) { + + try { + keyObject = inputKeyDeserializer.deserialize(keyWritable); + } catch (Exception e) { + throw new HiveException( + "Hive Runtime Error: Unable to deserialize reduce input key from " + + Utilities.formatBinaryString(keyWritable.get(), 0, + keyWritable.getSize()) + " with properties " + + keyTableDesc.getProperties(), e); + } + + if (groupKey == null) { // the first group + groupKey = new BytesWritable(); + } else { + // if its child has not been ended, end it + if(!keyWritable.equals(childOperatorsArray[0].getBytesWritableGroupKey())){ + childOperatorsArray[0].endGroup(); + } + } + groupKey.set(keyWritable.get(), 0, keyWritable.getSize()); + + if(!groupKey.equals(childOperatorsArray[0].getBytesWritableGroupKey())){ + childOperatorsArray[0].startGroup(); + childOperatorsArray[0].setGroupKeyObject(keyObject); + childOperatorsArray[0].setBytesWritableGroupKey(groupKey); + } + + } + + forwardedRow.clear(); + forwardedRow.add(keyObject); + forwardedRow.add(valueObject); + forwardedRow.add(tagWritable); + forward(forwardedRow, outputObjInspector); + } + } catch (SerDeException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } + + } + + @Override + public void closeOp(boolean abort) throws HiveException { + if(!abort){ + if(childOperatorsArray[0].getNumOfClosedParentOperators() == childOperatorsArray[0].getParentOperators().size() - 1){ + childOperatorsArray[0].endGroup(); + } + + } + } + + @Override + public void startGroup() throws HiveException { + // do nothing + } + + @Override + public void endGroup() throws HiveException { + // do nothing + } + + @Override + public void setGroupKeyObject(Object keyObject) { + // do nothing + + } + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return new String("CFReduceSink"); + } + + @Override + public OperatorType getType() { + return OperatorType.CORRELATIONFAKEREDUCESINK; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationManualForwardOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationManualForwardOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationManualForwardOperator.java (revision 0) @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CorrelationManualForwardDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; + +/** + * Correlation Manual Forward Operator. Collect row and forward it by requests + **/ +public class CorrelationManualForwardOperator extends Operator implements + Serializable { + private static final long serialVersionUID = 1L; + + private Object row; + private int tag; + + public Object getRow(){ + return this.row; + } + + public int getTag(){ + return this.tag; + } + + @Override + public void processOp(Object row, int tag) throws HiveException { + this.row = row; + this.tag = tag; + } + + public void forwardIt() throws HiveException { + forward(row, inputObjInspectors[tag]); + } + + public void clean(){ + this.row = null; + this.tag = -1; + } + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return new String("CMF"); + } + + @Override + public OperatorType getType() { + return OperatorType.CORRELATIONMANUALFORWARD; + } + + + +} Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1172874) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ConditionalTask; +import org.apache.hadoop.hive.ql.exec.CorrelationReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.ExecDriver; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; @@ -89,6 +90,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.optimizer.CorrelationGenMRRedSink1; import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1; import org.apache.hadoop.hive.ql.optimizer.GenMROperator; import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext; @@ -179,7 +181,7 @@ private List loadTableWork; private List loadFileWork; private Map joinContext; - private final HashMap topToTable; + private HashMap topToTable; private QB qb; private ASTNode ast; private int destTableId; @@ -200,6 +202,9 @@ private final UnparseTranslator unparseTranslator; private final GlobalLimitCtx globalLimitCtx = new GlobalLimitCtx(); + // The mapping from (RS, GBY) parts to corresponding (MapSide-GBY, RS, GBY) + Map groupbyRegular2MapSide; + //prefix for column names auto generated by hive private final String autogenColAliasPrfxLbl; private final boolean autogenColAliasPrfxIncludeFuncName; @@ -279,6 +284,7 @@ HiveConf.ConfVars.HIVE_AUTOGEN_COLUMNALIAS_PREFIX_LABEL); autogenColAliasPrfxIncludeFuncName = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTOGEN_COLUMNALIAS_PREFIX_INCLUDEFUNCNAME); + groupbyRegular2MapSide = new HashMap(); } @Override @@ -297,6 +303,8 @@ opParseCtx.clear(); groupOpToInputTables.clear(); prunedPartitions.clear(); + topToTable.clear(); + groupbyRegular2MapSide.clear(); } public void init(ParseContext pctx) { @@ -304,6 +312,7 @@ opToPartList = pctx.getOpToPartList(); opToSamplePruner = pctx.getOpToSamplePruner(); topOps = pctx.getTopOps(); + topToTable = pctx.getTopToTable(); topSelOps = pctx.getTopSelOps(); opParseCtx = pctx.getOpParseCtx(); loadTableWork = pctx.getLoadTableWork(); @@ -318,6 +327,7 @@ groupOpToInputTables = pctx.getGroupOpToInputTables(); prunedPartitions = pctx.getPrunedPartitions(); setLineageInfo(pctx.getLineageInfo()); + groupbyRegular2MapSide = pctx.getGroupbyRegular2MapSide(); } public ParseContext getParseContext() { @@ -325,7 +335,8 @@ topSelOps, opParseCtx, joinContext, topToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, - opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks); + opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, + groupbyRegular2MapSide); } @SuppressWarnings("nls") @@ -2868,6 +2879,7 @@ ColumnInfo colInfo = new ColumnInfo(field, expr.getTypeInfo(), null, false); reduceSinkOutputRowResolver.putExpression(parameter, colInfo); numExprs++; + colExprMap.put(colInfo.getInternalName(), expr); } distinctColIndices.add(distinctIndices); } @@ -2885,15 +2897,18 @@ for (int i = 1; i < value.getChildCount(); i++) { ASTNode parameter = (ASTNode) value.getChild(i); if (reduceSinkOutputRowResolver.getExpression(parameter) == null) { - reduceValues.add(genExprNodeDesc(parameter, - reduceSinkInputRowResolver)); + ExprNodeDesc expr = genExprNodeDesc(parameter, + reduceSinkInputRowResolver); + reduceValues.add(expr); outputValueColumnNames .add(getColumnInternalName(reduceValues.size() - 1)); String field = Utilities.ReduceField.VALUE.toString() + "." + getColumnInternalName(reduceValues.size() - 1); - reduceSinkOutputRowResolver.putExpression(parameter, new ColumnInfo(field, + ColumnInfo colInfo = new ColumnInfo(field, reduceValues.get(reduceValues.size() - 1).getTypeInfo(), null, - false)); + false); + reduceSinkOutputRowResolver.putExpression(parameter, colInfo); + colExprMap.put(colInfo.getInternalName(),expr); } } } @@ -2905,14 +2920,18 @@ TypeInfo type = reduceSinkInputRowResolver.getColumnInfos().get( inputField).getType(); - reduceValues.add(new ExprNodeColumnDesc(type, - getColumnInternalName(inputField), "", false)); + ExprNodeDesc expr = new ExprNodeColumnDesc(type, + getColumnInternalName(inputField), "", false); + reduceValues.add(expr); inputField++; outputValueColumnNames.add(getColumnInternalName(reduceValues.size() - 1)); String field = Utilities.ReduceField.VALUE.toString() + "." + getColumnInternalName(reduceValues.size() - 1); + ColumnInfo colInfo = new ColumnInfo(field, + type, null, false); reduceSinkOutputRowResolver.putExpression(entry.getValue(), - new ColumnInfo(field, type, null, false)); + colInfo); + colExprMap.put(colInfo.getInternalName(),expr); } } @@ -5861,7 +5880,8 @@ // insert a select operator here used by the ColumnPruner to reduce // the data to shuffle curr = insertSelectAllPlanForGroupBy(dest, curr); - if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) { + if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE) && + !conf.getBoolVar(HiveConf.ConfVars.HIVEOPTCORRELATION)){ if (!conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) { curr = genGroupByPlanMapAggr1MR(dest, qb, curr); } else { @@ -5870,7 +5890,19 @@ } else if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) { curr = genGroupByPlan2MR(dest, qb, curr); } else { + Operator mapSide = null; + if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE) && + conf.getBoolVar(HiveConf.ConfVars.HIVEOPTCORRELATION)){ + mapSide = genGroupByPlanMapAggr1MR(dest, qb, curr); + mapSide = (Operator)((Operator)mapSide.getParentOperators().get(0)).getParentOperators().get(0); + curr.getChildOperators().remove(mapSide); + } curr = genGroupByPlan1MR(dest, qb, curr); + if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE) && + conf.getBoolVar(HiveConf.ConfVars.HIVEOPTCORRELATION)){ + groupbyRegular2MapSide.put((ReduceSinkOperator )curr.getParentOperators().get(0), + (GroupByOperator)mapSide); + } } } @@ -6948,6 +6980,10 @@ MapJoinFactory.getMapJoinMapJoin()); opRules.put(new RuleRegExp(new String("R11"), "MAPJOIN%SEL%"), MapJoinFactory.getMapJoin()); + opRules.put(new RuleRegExp(new String("R12"), "TS%.*CRS%"), + new CorrelationGenMRRedSink1()); + opRules.put(new RuleRegExp(new String("R13"), "CRS%.*RS%"), + new GenMRRedSink2()); // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along @@ -7132,6 +7168,10 @@ topOp.setChildOperators(null); } + if (topOp instanceof CorrelationReduceSinkOperator) { + topOp.setChildOperators(null); + } + if (topOp.getChildOperators() == null) { return; } @@ -7247,7 +7287,8 @@ opToPartList, topOps, topSelOps, opParseCtx, joinContext, topToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, - opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks); + opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, + groupbyRegular2MapSide); Optimizer optm = new Optimizer(); optm.setPctx(pCtx); Index: ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationFakeReduceSinkDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationFakeReduceSinkDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationFakeReduceSinkDesc.java (revision 0) @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.List; + +/** + * Correlation Fake ReduceSinkDesc. + * + */ +@Explain(displayName = "Correlation Fake Reduce Output Operator") +public class CorrelationFakeReduceSinkDesc implements Serializable { + private static final long serialVersionUID = 1L; + /** + * Key columns are passed to reducer in the "key". + */ + private java.util.ArrayList keyCols; + private java.util.ArrayList outputKeyColumnNames; + private List> distinctColumnIndices; + /** + * Value columns are passed to reducer in the "value". + */ + private java.util.ArrayList valueCols; + private java.util.ArrayList outputValueColumnNames; + /** + * Describe how to serialize the key. + */ + private TableDesc keySerializeInfo; + /** + * Describe how to serialize the value. + */ + private TableDesc valueSerializeInfo; + + /** + * The tag for this reducesink descriptor. + */ + private int tag; + + /** + * Number of distribution keys. + */ + private int numDistributionKeys; + + /** + * The partition columns (CLUSTER BY or DISTRIBUTE BY in Hive language). + * Partition columns decide the reducer that the current row goes to. + * Partition columns are not passed to reducer. + */ + private java.util.ArrayList partitionCols; + + private int numReducers; + + public CorrelationFakeReduceSinkDesc() { + } + + public CorrelationFakeReduceSinkDesc(java.util.ArrayList keyCols, + int numDistributionKeys, + java.util.ArrayList valueCols, + java.util.ArrayList outputKeyColumnNames, + List> distinctColumnIndices, + java.util.ArrayList outputValueColumnNames, int tag, + java.util.ArrayList partitionCols, int numReducers, + final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) { + this.keyCols = keyCols; + this.numDistributionKeys = numDistributionKeys; + this.valueCols = valueCols; + this.outputKeyColumnNames = outputKeyColumnNames; + this.outputValueColumnNames = outputValueColumnNames; + this.tag = tag; + this.numReducers = numReducers; + this.partitionCols = partitionCols; + this.keySerializeInfo = keySerializeInfo; + this.valueSerializeInfo = valueSerializeInfo; + this.distinctColumnIndices = distinctColumnIndices; + } + + public CorrelationFakeReduceSinkDesc(ReduceSinkDesc reduceSinkDesc){ + this.keyCols = reduceSinkDesc.getKeyCols(); + this.numDistributionKeys = reduceSinkDesc.getNumDistributionKeys(); + this.valueCols = reduceSinkDesc.getValueCols(); + this.outputKeyColumnNames = reduceSinkDesc.getOutputKeyColumnNames(); + this.outputValueColumnNames = reduceSinkDesc.getOutputValueColumnNames(); + this.tag = reduceSinkDesc.getTag(); + this.numReducers = reduceSinkDesc.getNumReducers(); + this.partitionCols = reduceSinkDesc.getPartitionCols(); + this.keySerializeInfo = reduceSinkDesc.getKeySerializeInfo(); + this.valueSerializeInfo = reduceSinkDesc.getValueSerializeInfo(); + this.distinctColumnIndices = reduceSinkDesc.getDistinctColumnIndices(); + } + + public java.util.ArrayList getOutputKeyColumnNames() { + return outputKeyColumnNames; + } + + public void setOutputKeyColumnNames( + java.util.ArrayList outputKeyColumnNames) { + this.outputKeyColumnNames = outputKeyColumnNames; + } + + public java.util.ArrayList getOutputValueColumnNames() { + return outputValueColumnNames; + } + + public void setOutputValueColumnNames( + java.util.ArrayList outputValueColumnNames) { + this.outputValueColumnNames = outputValueColumnNames; + } + + @Explain(displayName = "key expressions") + public java.util.ArrayList getKeyCols() { + return keyCols; + } + + public void setKeyCols(final java.util.ArrayList keyCols) { + this.keyCols = keyCols; + } + + public int getNumDistributionKeys() { + return this.numDistributionKeys; + } + + public void setNumDistributionKeys(int numKeys) { + this.numDistributionKeys = numKeys; + } + + @Explain(displayName = "value expressions") + public java.util.ArrayList getValueCols() { + return valueCols; + } + + public void setValueCols(final java.util.ArrayList valueCols) { + this.valueCols = valueCols; + } + + @Explain(displayName = "Map-reduce partition columns") + public java.util.ArrayList getPartitionCols() { + return partitionCols; + } + + public void setPartitionCols( + final java.util.ArrayList partitionCols) { + this.partitionCols = partitionCols; + } + + @Explain(displayName = "tag") + public int getTag() { + return tag; + } + + public void setTag(int tag) { + this.tag = tag; + } + + /** + * Returns the number of reducers for the map-reduce job. -1 means to decide + * the number of reducers at runtime. This enables Hive to estimate the number + * of reducers based on the map-reduce input data size, which is only + * available right before we start the map-reduce job. + */ + public int getNumReducers() { + return numReducers; + } + + public void setNumReducers(int numReducers) { + this.numReducers = numReducers; + } + + public TableDesc getKeySerializeInfo() { + return keySerializeInfo; + } + + public void setKeySerializeInfo(TableDesc keySerializeInfo) { + this.keySerializeInfo = keySerializeInfo; + } + + public TableDesc getValueSerializeInfo() { + return valueSerializeInfo; + } + + public void setValueSerializeInfo(TableDesc valueSerializeInfo) { + this.valueSerializeInfo = valueSerializeInfo; + } + + /** + * Returns the sort order of the key columns. + * + * @return null, which means ascending order for all key columns, or a String + * of the same length as key columns, that consists of only "+" + * (ascending order) and "-" (descending order). + */ + @Explain(displayName = "sort order") + public String getOrder() { + return keySerializeInfo.getProperties().getProperty( + org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER); + } + + public void setOrder(String orderStr) { + keySerializeInfo.getProperties().setProperty( + org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER, + orderStr); + } + + public List> getDistinctColumnIndices() { + return distinctColumnIndices; + } + + public void setDistinctColumnIndices( + List> distinctColumnIndices) { + this.distinctColumnIndices = distinctColumnIndices; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationGenMRRedSink1.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationGenMRRedSink1.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationGenMRRedSink1.java (revision 0) @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Stack; + +import org.apache.hadoop.hive.ql.exec.CorrelationReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.MapredWork; + +/** + * Processor for the rule - table scan followed by reduce sink. + */ +public class CorrelationGenMRRedSink1 implements NodeProcessor { + + public CorrelationGenMRRedSink1() { + } + + /** + * Reduce Scan encountered. + * + * @param nd + * the reduce sink operator encountered + * @param opProcCtx + * context + */ + public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx, + Object... nodeOutputs) throws SemanticException { + CorrelationReduceSinkOperator op = (CorrelationReduceSinkOperator) nd; + GenMRProcContext ctx = (GenMRProcContext) opProcCtx; + + Map, GenMapRedCtx> mapCurrCtx = ctx + .getMapCurrCtx(); + GenMapRedCtx mapredCtx = mapCurrCtx.get(stack.get(stack.size() - 2)); + Task currTask = mapredCtx.getCurrTask(); + MapredWork currPlan = (MapredWork) currTask.getWork(); + Operator currTopOp = mapredCtx.getCurrTopOp(); + String currAliasId = mapredCtx.getCurrAliasId(); + Operator reducer = op.getChildOperators().get(0); + HashMap, Task> opTaskMap = ctx + .getOpTaskMap(); + Task opMapTask = opTaskMap.get(reducer); + + ctx.setCurrTopOp(currTopOp); + ctx.setCurrAliasId(currAliasId); + ctx.setCurrTask(currTask); + + // If the plan for this reducer does not exist, initialize the plan + if (opMapTask == null) { + if (currPlan.getReducer() == null) { + GenMapRedUtils.initPlan((CorrelationReduceSinkOperator)op, ctx); + } else { + GenMapRedUtils.splitPlan((CorrelationReduceSinkOperator)op, ctx); + } + } else { + // This will happen in case of joins. The current plan can be thrown away + // after being merged with the original plan + GenMapRedUtils.joinPlan(op, null, opMapTask, ctx, -1, false, false, false); + currTask = opMapTask; + ctx.setCurrTask(currTask); + } + + mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), + ctx.getCurrAliasId())); + return null; + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationDispatchDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationDispatchDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationDispatchDesc.java (revision 0) @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map.Entry; + + +/** + * Correlation dispatch operator Descriptor implementation. + * + */ +@Explain(displayName = "Correlation Dispatch Operator") +public class CorrelationDispatchDesc implements Serializable { + + private static final long serialVersionUID = 1L; + + + private HashMap>> dispatchConf; + private HashMap>> dispatchValueSelectDescConf; + private HashMap>> dispatchKeySelectDescConf; + + public CorrelationDispatchDesc(){ + this.dispatchConf = new HashMap>>(); + this.dispatchValueSelectDescConf = new HashMap>>(); + this.dispatchKeySelectDescConf = new HashMap>>(); + + } + + public CorrelationDispatchDesc(HashMap>> dispatchConf){ + this.dispatchConf = dispatchConf; + this.dispatchValueSelectDescConf = new HashMap>>(); + this.dispatchKeySelectDescConf = new HashMap>>(); + for(Entry>> entry: this.dispatchConf.entrySet()){ + HashMap> tmp = new HashMap>(); + for(Integer child: entry.getValue().keySet()){ + tmp.put(child, new ArrayList()); + tmp.get(child).add(new SelectDesc(true)); + } + this.dispatchValueSelectDescConf.put(entry.getKey(), tmp); + this.dispatchKeySelectDescConf.put(entry.getKey(), tmp); + } + } + + public CorrelationDispatchDesc(HashMap>> dispatchConf, + HashMap>> dispatchKeySelectDescConf, + HashMap>> dispatchValueSelectDescConf){ + this.dispatchConf = dispatchConf; + this.dispatchValueSelectDescConf = dispatchValueSelectDescConf; + this.dispatchKeySelectDescConf = dispatchKeySelectDescConf; + } + + public void setDispatchConf(HashMap>> dispatchConf){ + this.dispatchConf = dispatchConf; + } + + public HashMap>> getDispatchConf(){ + return this.dispatchConf; + } + + public void setDispatchValueSelectDescConf(HashMap>> dispatchValueSelectDescConf){ + this.dispatchValueSelectDescConf = dispatchValueSelectDescConf; + } + + public HashMap>> getDispatchValueSelectDescConf(){ + return this.dispatchValueSelectDescConf; + } + + public void setDispatchKeySelectDescConf(HashMap>> dispatchKeySelectDescConf){ + this.dispatchKeySelectDescConf = dispatchKeySelectDescConf; + } + + public HashMap>> getDispatchKeySelectDescConf() { + return this.dispatchKeySelectDescConf; + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java (revision 0) @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.CorrelationManualForwardOperator; +import org.apache.hadoop.hive.ql.exec.CorrelationReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Utilities; + + +/** + * Correlation composite operator Descriptor implementation. + * + */ +@Explain(displayName = "Correlation Composite Operator") +public class CorrelationCompositeDesc implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private CorrelationReduceSinkOperator correspondingCorrelationReduceSinkOperator; + + public CorrelationCompositeDesc(){ + + } + + public CorrelationCompositeDesc(CorrelationReduceSinkOperator correspondingCorrelationReduceSinkOperator){ + this.correspondingCorrelationReduceSinkOperator = correspondingCorrelationReduceSinkOperator; + } + + public void setCorrespondingCorrelationReduceSinkOperator( + CorrelationReduceSinkOperator correspondingCorrelationReduceSinkOperator){ + this.correspondingCorrelationReduceSinkOperator = correspondingCorrelationReduceSinkOperator; + } + + public CorrelationReduceSinkOperator getCorrespondingCorrelationReduceSinkOperator(){ + return correspondingCorrelationReduceSinkOperator; + } + + private int[] allOperationPathTags; + private ArrayList> bottomInternalOperators = new ArrayList>(); + private ArrayList correlationManualForwardOperators = new ArrayList();; + + public void setCorrelationManualForwardOperators( + ArrayList correlationManualForwardOperators){ + this.correlationManualForwardOperators = correlationManualForwardOperators; + } + + public ArrayList getCorrelationManualForwardOperators(){ + return correlationManualForwardOperators; + } + + public void setBottomInternalOperators(ArrayList> bottomInternalOperators){ + this.bottomInternalOperators = bottomInternalOperators; + } + + public ArrayList> getBottomInternalOperators(){ + return bottomInternalOperators; + } + + public void setAllOperationPathTags(int[] allOperationPathTags){ + this.allOperationPathTags = allOperationPathTags; + } + + public int[] getAllOperationPathTags(){ + return allOperationPathTags; + } + + + public void setInternalNodes(ArrayList> bottomInternalOperators, + ArrayList> topInternalOperators, int[] allOperationPathTags){ + this.allOperationPathTags = allOperationPathTags; + // the size of bottomInternalOperators and topInternalOperators should be same. + this.bottomInternalOperators.addAll(bottomInternalOperators); + for (Operator op: topInternalOperators){ + CorrelationManualForwardOperator cmfo = new CorrelationManualForwardOperator(); + cmfo.setChildOperators(null); + correlationManualForwardOperators.add(cmfo); + List> newChildren = + new ArrayList>(); + newChildren.add(cmfo); + op.setChildOperators(newChildren); + cmfo.setParentOperators(Utilities.makeList(op)); + } + } +} Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1172874) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -382,6 +382,7 @@ HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true), + HIVEOPTCORRELATION("hive.optimize.correlation", false), // exploit intra-query correlations // Indexes HIVEOPTINDEXFILTER_COMPACT_MINSIZE("hive.optimize.index.filter.compact.minsize", (long) 5 * 1024 * 1024 * 1024), // 5G Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationDispatchOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationDispatchOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationDispatchOperator.java (revision 0) @@ -0,0 +1,420 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CorrelationDispatchDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.BytesWritable; + +/** + * Correlation dispatch operator implementation. + * + */ +public class CorrelationDispatchOperator extends Operator implements Serializable{ + + private static final long serialVersionUID = 1L; + private static String[] fieldNames; + static { + ArrayList fieldNameArray = new ArrayList(); + for (Utilities.ReduceField r : Utilities.ReduceField.values()) { + fieldNameArray.add(r.toString()); + } + fieldNames = fieldNameArray.toArray(new String[0]); + } + + protected static class DispatchHandler{ + + protected Log l4j = LogFactory.getLog(this.getClass().getName()); + + private final ObjectInspector[] inputObjInspector; + private ObjectInspector outputObjInspector; + private ObjectInspector keyObjInspector; + private ObjectInspector valueObjInspector; + private final byte inputTag; + private final byte outputTag; + private final byte childIndx; + private final ByteWritable outputTagByteWritable; + private final SelectDesc selectDesc; + private final SelectDesc keySelectDesc; + private ExprNodeEvaluator[] keyEval; + private ExprNodeEvaluator[] eval; + + // counters for debugging + private transient long cntr = 0; + private transient long nextCntr = 1; + + private long getNextCntr(long cntr) { + // A very simple counter to keep track of number of rows processed by an + // operator. It dumps + // every 1 million times, and quickly before that + if (cntr >= 1000000) { + return cntr + 1000000; + } + + return 10 * cntr; + } + + public long getCntr(){ + return this.cntr; + } + + private final Log LOG; + private final boolean isLogInfoEnabled; + private final String id; + + public DispatchHandler(ObjectInspector[] inputObjInspector, byte inputTag, byte childIndx, byte outputTag, + SelectDesc selectDesc, SelectDesc keySelectDesc, Log LOG, String id) + throws HiveException{ + this.inputObjInspector = inputObjInspector; + assert this.inputObjInspector.length == 1; + this.inputTag = inputTag; + this.childIndx = childIndx; + this.outputTag = outputTag; + this.selectDesc = selectDesc; + this.keySelectDesc = keySelectDesc; + this.outputTagByteWritable = new ByteWritable(outputTag); + this.LOG = LOG; + this.isLogInfoEnabled = LOG.isInfoEnabled(); + this.id = id; + init(); + } + + private void init() throws HiveException{ + ArrayList ois = new ArrayList(); + if (keySelectDesc.isSelStarNoCompute()){ + ois.add((ObjectInspector) ((ArrayList)inputObjInspector[0]).get(0)); + }else{ + ArrayList colList = this.keySelectDesc.getColList(); + keyEval = new ExprNodeEvaluator[colList.size()]; + for (int k = 0; k < colList.size(); k++) { + assert (colList.get(k) != null); + keyEval[k] = ExprNodeEvaluatorFactory.get(colList.get(k)); + } + keyObjInspector = initEvaluatorsAndReturnStruct(keyEval, keySelectDesc + .getOutputColumnNames(), ((StandardStructObjectInspector) inputObjInspector[0]).getAllStructFieldRefs().get(0).getFieldObjectInspector()); + + ois.add(keyObjInspector); + l4j.info("Key: input tag " + (int)inputTag + ", output tag " + (int)outputTag + ", SELECT inputOIForThisTag" + + ((StructObjectInspector) inputObjInspector[0]).getTypeName()); + + } + if (selectDesc.isSelStarNoCompute()){ + ois.add((ObjectInspector) ((ArrayList)inputObjInspector[0]).get(1)); + }else{ + ArrayList colList = this.selectDesc.getColList(); + eval = new ExprNodeEvaluator[colList.size()]; + for (int k = 0; k < colList.size(); k++) { + assert (colList.get(k) != null); + eval[k] = ExprNodeEvaluatorFactory.get(colList.get(k)); + } + valueObjInspector = initEvaluatorsAndReturnStruct(eval, selectDesc + .getOutputColumnNames(), ((StandardStructObjectInspector) inputObjInspector[0]).getAllStructFieldRefs().get(1).getFieldObjectInspector()); + + ois.add(valueObjInspector); + l4j.info("input tag " + (int)inputTag + ", output tag " + (int)outputTag + ", SELECT inputOIForThisTag" + + ((StructObjectInspector) inputObjInspector[0]).getTypeName()); //Yin + + } + ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector); + outputObjInspector = ObjectInspectorFactory + .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois); + l4j.info("input tag " + (int)inputTag + ", output tag " + (int)outputTag + ", SELECT outputObjInspector" + + ((StructObjectInspector) outputObjInspector).getTypeName()); //Yin + } + + public ObjectInspector getOutputObjInspector(){ + return outputObjInspector; + } + + public Object process(Object row) throws HiveException{ + Object[] keyOutput = new Object[keyEval.length]; + Object[] valueOutput = new Object[eval.length]; + ArrayList outputRow = new ArrayList(3); + List thisRow = (List)row; + if(keySelectDesc.isSelStarNoCompute()){ + outputRow.add(thisRow.get(0)); + }else{ + Object key = thisRow.get(0); + for (int j = 0; j < keyEval.length; j++) { + try { + keyOutput[j] = keyEval[j].evaluate(key); + + } catch (HiveException e) { + throw e; + } catch (RuntimeException e) { + throw new HiveException("Error evaluating " + + keySelectDesc.getColList().get(j).getExprString(), e); + } + } + outputRow.add(keyOutput); + } + + if (selectDesc.isSelStarNoCompute()){ + outputRow.add(thisRow.get(1)); + }else{ + Object value = thisRow.get(1); + for (int j = 0; j < eval.length; j++) { + try { + valueOutput[j] = eval[j].evaluate(value); + } catch (HiveException e) { + throw e; + } catch (RuntimeException e) { + throw new HiveException("Error evaluating " + + selectDesc.getColList().get(j).getExprString(), e); + } + } + outputRow.add(valueOutput); + } + outputRow.add(outputTagByteWritable); + + if (isLogInfoEnabled) { + cntr++; + if (cntr == nextCntr) { + LOG.info(id + "(inputTag, childIndx, outputTag)=(" + inputTag + ", " + childIndx + ", " + outputTag + "), forwarding " + cntr + " rows"); + nextCntr = getNextCntr(cntr); + } + } + + return outputRow; + } + + public void printCloseOpLog(){ + LOG.info(id + "(inputTag, childIndx, outputTag)=(" + inputTag + ", " + childIndx + ", " + outputTag + "), forwarded " + cntr + " rows"); + } + + } + + //inputTag->(Child->List) + private HashMap>> dispatchConf; + //inputTag->(Child->List) + private HashMap>> dispatchValueSelectDescConf; + //inputTag->(Child->List) + private HashMap>> dispatchKeySelectDescConf; + //inputTag->(Child->List) + private HashMap>> dispatchHandlers; + //Child->(outputTag->DispatchHandler) + private HashMap> child2OutputTag2DispatchHandlers; + //Child->Child's inputObjInspectors + private HashMap childInputObjInspectors; + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + + dispatchConf = conf.getDispatchConf(); + dispatchValueSelectDescConf = conf.getDispatchValueSelectDescConf(); + dispatchKeySelectDescConf = conf.getDispatchKeySelectDescConf(); + dispatchHandlers = new HashMap>>(); + for(Entry>> entry: dispatchConf.entrySet()){ + HashMap> tmp = new HashMap>(); + for(Entry> child2outputTag: entry.getValue().entrySet()){ + tmp.put(child2outputTag.getKey(), new ArrayList()); + int indx = 0; + for(Integer outputTag: child2outputTag.getValue()){ + tmp.get(child2outputTag.getKey()).add( + new DispatchHandler(new ObjectInspector[]{inputObjInspectors[entry.getKey()]}, + entry.getKey().byteValue(), child2outputTag.getKey().byteValue(), outputTag.byteValue(), + dispatchValueSelectDescConf.get(entry.getKey()).get(child2outputTag.getKey()).get(indx), + dispatchKeySelectDescConf.get(entry.getKey()).get(child2outputTag.getKey()).get(indx), LOG, id)); + indx++; + } + } + dispatchHandlers.put(entry.getKey(), tmp); + } + + child2OutputTag2DispatchHandlers = new HashMap>(); + for(Entry>> entry: dispatchConf.entrySet()){ + for(Entry> child2outputTag: entry.getValue().entrySet()){ + if (!child2OutputTag2DispatchHandlers.containsKey(child2outputTag.getKey())){ + child2OutputTag2DispatchHandlers.put(child2outputTag.getKey(), new HashMap()); + } + int indx = 0; + for(Integer outputTag: child2outputTag.getValue()){ + child2OutputTag2DispatchHandlers.get(child2outputTag.getKey()). + put(outputTag, dispatchHandlers.get(entry.getKey()).get(child2outputTag.getKey()).get(indx)); + indx++; + } + + } + } + + childInputObjInspectors = new HashMap(); + for(Entry> entry: child2OutputTag2DispatchHandlers.entrySet()){ + Integer l = Collections.max(entry.getValue().keySet()); + ObjectInspector[] childObjInspectors = new ObjectInspector[l.intValue() + 1]; + for(Entry e: entry.getValue().entrySet()){ + if(e.getKey().intValue() == -1){ + assert childObjInspectors.length == 1; + childObjInspectors[0] = e.getValue().getOutputObjInspector(); + }else{ + childObjInspectors[e.getKey().intValue()] = e.getValue().getOutputObjInspector(); + } + } + childInputObjInspectors.put(entry.getKey(), childObjInspectors); + } + + initializeChildren(hconf); + } + + //Each child should has its own outputObjInspector + @Override + protected void initializeChildren(Configuration hconf) throws HiveException { + state = State.INIT; + LOG.info("Operator " + id + " " + getName() + " initialized"); + if (childOperators == null) { + return; + } + LOG.info("Initializing children of " + id + " " + getName()); + for (int i = 0; i < childOperatorsArray.length; i++) { + LOG.info("Initializing child " + i + " " + childOperatorsArray[i].getIdentifier() + " " + + childOperatorsArray[i].getName() + + " " + childInputObjInspectors.get(i).length); + childOperatorsArray[i].initialize(hconf, childInputObjInspectors.get(i)); + if (reporter != null) { + childOperatorsArray[i].setReporter(reporter); + } + } + } + + + private int opTags; + private int inputTag; + + @Override + public void processOp(Object row, int tag) throws HiveException { + ArrayList thisRow = (ArrayList)row; + assert thisRow.size() == 4; + opTags = ((ByteWritable)thisRow.get(3)).get(); + inputTag = (int)((ByteWritable)thisRow.get(2)).get(); + forward(thisRow.subList(0, 3), inputObjInspectors[inputTag]); + + } + + + + @Override + public void forward(Object row, ObjectInspector rowInspector) + throws HiveException { + + if ((++outputRows % 1000) == 0) { + if (counterNameToEnum != null) { + incrCounter(numOutputRowsCntr, outputRows); + outputRows = 0; + } + } + + if (childOperatorsArray == null && childOperators != null) { + throw new HiveException( + "Internal Hive error during operator initialization."); + } + + if ((childOperatorsArray == null) || (getDone())) { + return; + } + + int childrenDone = 0; + int forwardFLag = 1; + assert childOperatorsArray.length <= 8; + for (int i = 0; i < childOperatorsArray.length; i++) { + Operator o = childOperatorsArray[i]; + if (o.getDone()) { + childrenDone++; + } else { + if ((opTags & (forwardFLag << i)) != 0){ + for(int j = 0; j> childIndx2DispatchHandlers: dispatchHandlers.values()){ + for(ArrayList dispatchHandlers: childIndx2DispatchHandlers.values()){ + for(DispatchHandler dispatchHandler: dispatchHandlers){ + dispatchHandler.printCloseOpLog(); + } + } + } + } + + + @Override + public void setBytesWritableGroupKey(BytesWritable groupKey) { + if(bytesWritableGroupKey == null){ + bytesWritableGroupKey = new BytesWritable(); + } + bytesWritableGroupKey.set(groupKey.get(), 0, groupKey.getSize()); + for(Operator op : childOperators){ + op.setBytesWritableGroupKey(bytesWritableGroupKey); + } + } + + @Override + public void setGroupKeyObject(Object keyObject) { + this.groupKeyObject = keyObject; + for(Operator op : childOperators){ + op.setGroupKeyObject(keyObject); + } + } + + + + @Override + public OperatorType getType() { + return OperatorType.CORRELATIONDISPATCH; + } + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return new String("CDP"); + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (revision 1172874) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (working copy) @@ -72,6 +72,7 @@ private Long minSplitSizePerRack; private boolean needsTagging; + private boolean needsOperationPathTagging; private boolean hadoopSupportsSplittable; private MapredLocalWork mapLocalWork; @@ -335,6 +336,16 @@ this.needsTagging = needsTagging; } + // TODO: include "Needs Operation Paths Tagging: false" into correct results + // @Explain(displayName = "Needs Operation Paths Tagging", normalExplain = false) + public boolean getNeedsOperationPathTagging() { + return needsOperationPathTagging; + } + + public void setNeedsOperationPathTagging(boolean needsOperationPathTagging) { + this.needsOperationPathTagging = needsOperationPathTagging; + } + public boolean getHadoopSupportsSplittable() { return hadoopSupportsSplittable; } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationReduceSinkDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationReduceSinkDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationReduceSinkDesc.java (revision 0) @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * Correlation ReduceSinkDesc. + * + */ +@Explain(displayName = "Correlation Reduce Output Operator") +public class CorrelationReduceSinkDesc implements Serializable { + private static final long serialVersionUID = 1L; + /** + * Key columns are passed to reducer in the "key". + */ + private java.util.ArrayList keyCols; + private java.util.ArrayList outputKeyColumnNames; + private List> distinctColumnIndices; + /** + * Value columns are passed to reducer in the "value". + */ + private java.util.ArrayList valueCols; + private java.util.ArrayList outputValueColumnNames; + /** + * Describe how to serialize the key. + */ + private TableDesc keySerializeInfo; + /** + * Describe how to serialize the value. + */ + private TableDesc valueSerializeInfo; + + /** + * The tag for this reducesink descriptor. + */ + private int tag; + + /** + * The array of tags which indicate which operations the output does not belong to. + */ + private ArrayList operationIndicators; + + /** + * Number of distribution keys. + */ + private int numDistributionKeys; + + /** + * The partition columns (CLUSTER BY or DISTRIBUTE BY in Hive language). + * Partition columns decide the reducer that the current row goes to. + * Partition columns are not passed to reducer. + */ + private java.util.ArrayList partitionCols; + + private int numReducers; + + // a static operation path tags (set if necessary) + private ArrayList staticOperationPathTags; + + public CorrelationReduceSinkDesc() { + } + + public CorrelationReduceSinkDesc(java.util.ArrayList keyCols, + int numDistributionKeys, + java.util.ArrayList valueCols, + java.util.ArrayList outputKeyColumnNames, + List> distinctColumnIndices, + java.util.ArrayList outputValueColumnNames, int tag, + java.util.ArrayList partitionCols, int numReducers, + final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) { + this.keyCols = keyCols; + this.numDistributionKeys = numDistributionKeys; + this.valueCols = valueCols; + this.outputKeyColumnNames = outputKeyColumnNames; + this.outputValueColumnNames = outputValueColumnNames; + this.tag = tag; + this.numReducers = numReducers; + this.partitionCols = partitionCols; + this.keySerializeInfo = keySerializeInfo; + this.valueSerializeInfo = valueSerializeInfo; + this.distinctColumnIndices = distinctColumnIndices; + this.operationIndicators = null; + this.staticOperationPathTags = null; + } + + public CorrelationReduceSinkDesc(java.util.ArrayList keyCols, + int numDistributionKeys, + java.util.ArrayList valueCols, + java.util.ArrayList outputKeyColumnNames, + List> distinctColumnIndices, + java.util.ArrayList outputValueColumnNames, int tag, + java.util.ArrayList partitionCols, int numReducers, + final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo, + java.util.ArrayList staticOperationPathTags, + java.util.ArrayList operationIndicators) { + this.keyCols = keyCols; + this.numDistributionKeys = numDistributionKeys; + this.valueCols = valueCols; + this.outputKeyColumnNames = outputKeyColumnNames; + this.outputValueColumnNames = outputValueColumnNames; + this.tag = tag; + this.operationIndicators = operationIndicators; + this.numReducers = numReducers; + this.partitionCols = partitionCols; + this.keySerializeInfo = keySerializeInfo; + this.valueSerializeInfo = valueSerializeInfo; + this.distinctColumnIndices = distinctColumnIndices; + this.staticOperationPathTags = staticOperationPathTags; + } + + public java.util.ArrayList getStaticOperationPathTags(){ + return staticOperationPathTags; + } + + + public java.util.ArrayList getOutputKeyColumnNames() { + return outputKeyColumnNames; + } + + public void setOutputKeyColumnNames( + java.util.ArrayList outputKeyColumnNames) { + this.outputKeyColumnNames = outputKeyColumnNames; + } + + public java.util.ArrayList getOutputValueColumnNames() { + return outputValueColumnNames; + } + + public void setOutputValueColumnNames( + java.util.ArrayList outputValueColumnNames) { + this.outputValueColumnNames = outputValueColumnNames; + } + + @Explain(displayName = "key expressions") + public java.util.ArrayList getKeyCols() { + return keyCols; + } + + public void setKeyCols(final java.util.ArrayList keyCols) { + this.keyCols = keyCols; + } + + public int getNumDistributionKeys() { + return this.numDistributionKeys; + } + + public void setNumDistributionKeys(int numKeys) { + this.numDistributionKeys = numKeys; + } + + @Explain(displayName = "value expressions") + public java.util.ArrayList getValueCols() { + return valueCols; + } + + public void setValueCols(final java.util.ArrayList valueCols) { + this.valueCols = valueCols; + } + + @Explain(displayName = "Map-reduce partition columns") + public java.util.ArrayList getPartitionCols() { + return partitionCols; + } + + public void setPartitionCols( + final java.util.ArrayList partitionCols) { + this.partitionCols = partitionCols; + } + + @Explain(displayName = "tag") + public int getTag() { + return tag; + } + + public void setTag(int tag) { + this.tag = tag; + } + + + @Explain(displayName = "operationIndicators") + public ArrayList getOperationIndicators() { + return operationIndicators; + } + + public void setTag(ArrayList operationIndicators) { + this.operationIndicators = operationIndicators; + } + + + /** + * Returns the number of reducers for the map-reduce job. -1 means to decide + * the number of reducers at runtime. This enables Hive to estimate the number + * of reducers based on the map-reduce input data size, which is only + * available right before we start the map-reduce job. + */ + public int getNumReducers() { + return numReducers; + } + + public void setNumReducers(int numReducers) { + this.numReducers = numReducers; + } + + public TableDesc getKeySerializeInfo() { + return keySerializeInfo; + } + + public void setKeySerializeInfo(TableDesc keySerializeInfo) { + this.keySerializeInfo = keySerializeInfo; + } + + public TableDesc getValueSerializeInfo() { + return valueSerializeInfo; + } + + public void setValueSerializeInfo(TableDesc valueSerializeInfo) { + this.valueSerializeInfo = valueSerializeInfo; + } + + /** + * Returns the sort order of the key columns. + * + * @return null, which means ascending order for all key columns, or a String + * of the same length as key columns, that consists of only "+" + * (ascending order) and "-" (descending order). + */ + @Explain(displayName = "sort order") + public String getOrder() { + return keySerializeInfo.getProperties().getProperty( + org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER); + } + + public void setOrder(String orderStr) { + keySerializeInfo.getProperties().setProperty( + org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER, + orderStr); + } + + public List> getDistinctColumnIndices() { + return distinctColumnIndices; + } + + public void setDistinctColumnIndices( + List> distinctColumnIndices) { + this.distinctColumnIndices = distinctColumnIndices; + } +}