### Eclipse Workspace Patch 1.0
#P hive-2206-p8
Index: ql/src/test/results/compiler/plan/groupby1.q.xml
===================================================================
--- ql/src/test/results/compiler/plan/groupby1.q.xml (revision 1224666)
+++ ql/src/test/results/compiler/plan/groupby1.q.xml (working copy)
@@ -311,6 +311,24 @@
+
+ VALUE._col0
+
+
@@ -383,21 +401,7 @@
-
-
- _col1
-
-
-
-
-
-
-
- double
-
-
-
-
+
@@ -494,7 +498,7 @@
_col0
-
+ key
@@ -590,7 +594,7 @@
-
+
@@ -1214,7 +1218,7 @@
_col1
-
+ _col1
@@ -1228,7 +1232,7 @@
_col0
-
+ _col0
@@ -1247,10 +1251,10 @@
-
+
-
+
@@ -1328,7 +1332,7 @@
_col0
-
+ KEY._col0
@@ -1380,7 +1384,7 @@
-
+
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/BaseReduceSinkDesc.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/BaseReduceSinkDesc.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/BaseReduceSinkDesc.java (revision 0)
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * ReduceSinkDesc.
+ *
+ */
+@Explain(displayName = "Base Reduce Output Operator")
+public class BaseReduceSinkDesc implements Serializable {
+ private static final long serialVersionUID = 1L;
+ /**
+ * Key columns are passed to reducer in the "key".
+ */
+ protected java.util.ArrayList keyCols;
+ protected java.util.ArrayList outputKeyColumnNames;
+ protected List> distinctColumnIndices;
+ /**
+ * Value columns are passed to reducer in the "value".
+ */
+ protected java.util.ArrayList valueCols;
+ protected java.util.ArrayList outputValueColumnNames;
+ /**
+ * Describe how to serialize the key.
+ */
+ protected TableDesc keySerializeInfo;
+ /**
+ * Describe how to serialize the value.
+ */
+ protected TableDesc valueSerializeInfo;
+
+ /**
+ * The tag for this reducesink descriptor.
+ */
+ protected int tag;
+
+ /**
+ * Number of distribution keys.
+ */
+ protected int numDistributionKeys;
+
+ /**
+ * The partition columns (CLUSTER BY or DISTRIBUTE BY in Hive language).
+ * Partition columns decide the reducer that the current row goes to.
+ * Partition columns are not passed to reducer.
+ */
+ protected java.util.ArrayList partitionCols;
+
+ protected int numReducers;
+
+ public BaseReduceSinkDesc() {
+ }
+
+ public java.util.ArrayList getOutputKeyColumnNames() {
+ return outputKeyColumnNames;
+ }
+
+ public void setOutputKeyColumnNames(
+ java.util.ArrayList outputKeyColumnNames) {
+ this.outputKeyColumnNames = outputKeyColumnNames;
+ }
+
+ public java.util.ArrayList getOutputValueColumnNames() {
+ return outputValueColumnNames;
+ }
+
+ public void setOutputValueColumnNames(
+ java.util.ArrayList outputValueColumnNames) {
+ this.outputValueColumnNames = outputValueColumnNames;
+ }
+
+ @Explain(displayName = "key expressions")
+ public java.util.ArrayList getKeyCols() {
+ return keyCols;
+ }
+
+ public void setKeyCols(final java.util.ArrayList keyCols) {
+ this.keyCols = keyCols;
+ }
+
+ public int getNumDistributionKeys() {
+ return this.numDistributionKeys;
+ }
+
+ public void setNumDistributionKeys(int numKeys) {
+ this.numDistributionKeys = numKeys;
+ }
+
+ @Explain(displayName = "value expressions")
+ public java.util.ArrayList getValueCols() {
+ return valueCols;
+ }
+
+ public void setValueCols(final java.util.ArrayList valueCols) {
+ this.valueCols = valueCols;
+ }
+
+ @Explain(displayName = "Map-reduce partition columns")
+ public java.util.ArrayList getPartitionCols() {
+ return partitionCols;
+ }
+
+ public void setPartitionCols(
+ final java.util.ArrayList partitionCols) {
+ this.partitionCols = partitionCols;
+ }
+
+ @Explain(displayName = "tag")
+ public int getTag() {
+ return tag;
+ }
+
+ public void setTag(int tag) {
+ this.tag = tag;
+ }
+
+ /**
+ * Returns the number of reducers for the map-reduce job. -1 means to decide
+ * the number of reducers at runtime. This enables Hive to estimate the number
+ * of reducers based on the map-reduce input data size, which is only
+ * available right before we start the map-reduce job.
+ */
+ public int getNumReducers() {
+ return numReducers;
+ }
+
+ public void setNumReducers(int numReducers) {
+ this.numReducers = numReducers;
+ }
+
+ public TableDesc getKeySerializeInfo() {
+ return keySerializeInfo;
+ }
+
+ public void setKeySerializeInfo(TableDesc keySerializeInfo) {
+ this.keySerializeInfo = keySerializeInfo;
+ }
+
+ public TableDesc getValueSerializeInfo() {
+ return valueSerializeInfo;
+ }
+
+ public void setValueSerializeInfo(TableDesc valueSerializeInfo) {
+ this.valueSerializeInfo = valueSerializeInfo;
+ }
+
+ /**
+ * Returns the sort order of the key columns.
+ *
+ * @return null, which means ascending order for all key columns, or a String
+ * of the same length as key columns, that consists of only "+"
+ * (ascending order) and "-" (descending order).
+ */
+ @Explain(displayName = "sort order")
+ public String getOrder() {
+ return keySerializeInfo.getProperties().getProperty(
+ org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER);
+ }
+
+ public void setOrder(String orderStr) {
+ keySerializeInfo.getProperties().setProperty(
+ org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER,
+ orderStr);
+ }
+
+ public List> getDistinctColumnIndices() {
+ return distinctColumnIndices;
+ }
+
+ public void setDistinctColumnIndices(
+ List> distinctColumnIndices) {
+ this.distinctColumnIndices = distinctColumnIndices;
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (revision 1224666)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (working copy)
@@ -44,7 +44,15 @@
* @param hiveConf
*/
public void initialize(HiveConf hiveConf) {
+ CorrelationOptimizer correlationOptimizer = new CorrelationOptimizer();
transformations = new ArrayList();
+ // Add correlation optimizer for first phase query plan tree analysis.
+ // The first phase will record original opColumnExprMap, opParseCtx, opRowResolver,
+ // since these may be changed by other optimizers (e.g. entries in opColumnExprMap may be deleted).
+ // TODO: Make correlation optimizer 1 phase.
+ if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION)) {
+ transformations.add(correlationOptimizer);
+ }
// Add the transformation that computes the lineage information.
transformations.add(new Generator());
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCP)) {
@@ -74,6 +82,11 @@
if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) {
transformations.add(new ReduceSinkDeDuplication());
}
+ // The second phase of correlation optimizer used for correlation detection and query plan tree transformation.
+ // The second phase should be the last optimizer added into transformations.
+ if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION)) {
+ transformations.add(correlationOptimizer);
+ }
}
/**
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1224666)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy)
@@ -48,6 +48,7 @@
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryProperties;
import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.ArchiveUtils;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.ExecDriver;
@@ -58,7 +59,6 @@
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapRedTask;
-import org.apache.hadoop.hive.ql.exec.ArchiveUtils;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.RecordReader;
@@ -185,7 +185,7 @@
private List loadTableWork;
private List loadFileWork;
private Map joinContext;
- private final HashMap topToTable;
+ private HashMap topToTable;
private QB qb;
private ASTNode ast;
private int destTableId;
@@ -206,6 +206,9 @@
private final UnparseTranslator unparseTranslator;
private final GlobalLimitCtx globalLimitCtx = new GlobalLimitCtx();
+ // store the variable ParseCOntext.groupbyRegular2MapSide
+ Map groupbyRegular2MapSide;
+
//prefix for column names auto generated by hive
private final String autogenColAliasPrfxLbl;
private final boolean autogenColAliasPrfxIncludeFuncName;
@@ -286,6 +289,7 @@
autogenColAliasPrfxIncludeFuncName = HiveConf.getBoolVar(conf,
HiveConf.ConfVars.HIVE_AUTOGEN_COLUMNALIAS_PREFIX_INCLUDEFUNCNAME);
queryProperties = new QueryProperties();
+ groupbyRegular2MapSide = new HashMap();
}
@Override
@@ -304,6 +308,8 @@
opParseCtx.clear();
groupOpToInputTables.clear();
prunedPartitions.clear();
+ topToTable.clear();
+ groupbyRegular2MapSide.clear();
}
public void init(ParseContext pctx) {
@@ -311,6 +317,7 @@
opToPartList = pctx.getOpToPartList();
opToSamplePruner = pctx.getOpToSamplePruner();
topOps = pctx.getTopOps();
+ topToTable = pctx.getTopToTable();
topSelOps = pctx.getTopSelOps();
opParseCtx = pctx.getOpParseCtx();
loadTableWork = pctx.getLoadTableWork();
@@ -325,6 +332,7 @@
groupOpToInputTables = pctx.getGroupOpToInputTables();
prunedPartitions = pctx.getPrunedPartitions();
setLineageInfo(pctx.getLineageInfo());
+ groupbyRegular2MapSide = pctx.getGroupbyRegular2MapSide();
}
public ParseContext getParseContext() {
@@ -332,7 +340,8 @@
topSelOps, opParseCtx, joinContext, topToTable, loadTableWork,
loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
- opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks);
+ opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
+ groupbyRegular2MapSide);
}
@SuppressWarnings("nls")
@@ -2952,6 +2961,7 @@
ColumnInfo colInfo = new ColumnInfo(field, expr.getTypeInfo(), null, false);
reduceSinkOutputRowResolver.putExpression(parameter, colInfo);
numExprs++;
+ colExprMap.put(colInfo.getInternalName(), expr);
}
distinctColIndices.add(distinctIndices);
}
@@ -2969,15 +2979,18 @@
for (int i = 1; i < value.getChildCount(); i++) {
ASTNode parameter = (ASTNode) value.getChild(i);
if (reduceSinkOutputRowResolver.getExpression(parameter) == null) {
- reduceValues.add(genExprNodeDesc(parameter,
- reduceSinkInputRowResolver));
+ ExprNodeDesc expr = genExprNodeDesc(parameter,
+ reduceSinkInputRowResolver);
+ reduceValues.add(expr);
outputValueColumnNames
.add(getColumnInternalName(reduceValues.size() - 1));
String field = Utilities.ReduceField.VALUE.toString() + "."
+ getColumnInternalName(reduceValues.size() - 1);
- reduceSinkOutputRowResolver.putExpression(parameter, new ColumnInfo(field,
+ ColumnInfo colInfo = new ColumnInfo(field,
reduceValues.get(reduceValues.size() - 1).getTypeInfo(), null,
- false));
+ false);
+ reduceSinkOutputRowResolver.putExpression(parameter, colInfo);
+ colExprMap.put(colInfo.getInternalName(),expr);
}
}
}
@@ -2989,14 +3002,18 @@
TypeInfo type = reduceSinkInputRowResolver.getColumnInfos().get(
inputField).getType();
- reduceValues.add(new ExprNodeColumnDesc(type,
- getColumnInternalName(inputField), "", false));
+ ExprNodeDesc expr = new ExprNodeColumnDesc(type,
+ getColumnInternalName(inputField), "", false);
+ reduceValues.add(expr);
inputField++;
outputValueColumnNames.add(getColumnInternalName(reduceValues.size() - 1));
String field = Utilities.ReduceField.VALUE.toString() + "."
+ getColumnInternalName(reduceValues.size() - 1);
+ ColumnInfo colInfo = new ColumnInfo(field,
+ type, null, false);
reduceSinkOutputRowResolver.putExpression(entry.getValue(),
- new ColumnInfo(field, type, null, false));
+ colInfo);
+ colExprMap.put(colInfo.getInternalName(),expr);
}
}
@@ -5941,7 +5958,8 @@
// insert a select operator here used by the ColumnPruner to reduce
// the data to shuffle
curr = insertSelectAllPlanForGroupBy(dest, curr);
- if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) {
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE) &&
+ !conf.getBoolVar(HiveConf.ConfVars.HIVEOPTCORRELATION)) {
if (!conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
curr = genGroupByPlanMapAggr1MR(dest, qb, curr);
} else {
@@ -5950,7 +5968,19 @@
} else if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
curr = genGroupByPlan2MR(dest, qb, curr);
} else {
+ Operator mapSide = null;
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE) &&
+ conf.getBoolVar(HiveConf.ConfVars.HIVEOPTCORRELATION)) {
+ mapSide = genGroupByPlanMapAggr1MR(dest, qb, curr);
+ mapSide = (Operator)((Operator)mapSide.getParentOperators().get(0)).getParentOperators().get(0);
+ curr.getChildOperators().remove(mapSide);
+ }
curr = genGroupByPlan1MR(dest, qb, curr);
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE) &&
+ conf.getBoolVar(HiveConf.ConfVars.HIVEOPTCORRELATION)) {
+ groupbyRegular2MapSide.put((ReduceSinkOperator )curr.getParentOperators().get(0),
+ (GroupByOperator)mapSide);
+ }
}
}
@@ -7345,7 +7375,8 @@
opToPartList, topOps, topSelOps, opParseCtx, joinContext, topToTable,
loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
- opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks);
+ opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
+ groupbyRegular2MapSide);
Optimizer optm = new Optimizer();
optm.setPctx(pCtx);
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (revision 1224666)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (working copy)
@@ -61,6 +61,7 @@
private Reporter rp;
private boolean abort = false;
private boolean isTagged = false;
+ private boolean isOperationPathTagged = false; //If operation path is tagged
private long cntr = 0;
private long nextCntr = 1;
@@ -116,6 +117,7 @@
reducer.setParentOperators(null); // clear out any parents as reducer is the
// root
isTagged = gWork.getNeedsTagging();
+ isOperationPathTagged = gWork.getNeedsOperationPathTagging();
try {
keyTableDesc = gWork.getKeyDesc();
inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc
@@ -164,8 +166,9 @@
private BytesWritable groupKey;
- ArrayList row = new ArrayList(3);
+ ArrayList row = new ArrayList(4);
ByteWritable tag = new ByteWritable();
+ ByteWritable operationPathTags = new ByteWritable();
public void reduce(Object key, Iterator values, OutputCollector output,
Reporter reporter) throws IOException {
@@ -188,6 +191,14 @@
keyWritable.setSize(size);
}
+ operationPathTags.set((byte)0);
+ if (isOperationPathTagged) {
+ // remove the operation plan tag
+ int size = keyWritable.getSize() - 1;
+ operationPathTags.set(keyWritable.get()[size]);
+ keyWritable.setSize(size);
+ }
+
if (!keyWritable.equals(groupKey)) {
// If a operator wants to do some work at the beginning of a group
if (groupKey == null) { // the first group
@@ -234,6 +245,7 @@
row.add(valueObject[tag.get()]);
// The tag is not used any more, we should remove it.
row.add(tag);
+ row.add(operationPathTags);
if (isLogInfoEnabled) {
cntr++;
if (cntr == nextCntr) {
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (revision 1224666)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (working copy)
@@ -60,16 +60,16 @@
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
-import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
-import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext;
/**
* General utility common functions for the Processor to convert operator into
@@ -114,8 +114,14 @@
}
if (reducer.getClass() == JoinOperator.class) {
plan.setNeedsTagging(true);
+ plan.setNeedsOperationPathTagging(false);
}
+ if (op.getConf().getNeedsOperationPathTagging()) {
+ plan.setNeedsTagging(true);
+ plan.setNeedsOperationPathTagging(true);
+ }
+
assert currTopOp != null;
List> seenOps = opProcCtx.getSeenOps();
String currAliasId = opProcCtx.getCurrAliasId();
@@ -178,6 +184,7 @@
opTaskMap.put(reducer, currTask);
if (reducer.getClass() == JoinOperator.class) {
plan.setNeedsTagging(true);
+ plan.setNeedsOperationPathTagging(false);
}
ReduceSinkDesc desc = (ReduceSinkDesc) op.getConf();
plan.setNumReduceTasks(desc.getNumReducers());
@@ -310,6 +317,7 @@
if (reducer.getClass() == JoinOperator.class) {
plan.setNeedsTagging(true);
+ plan.setNeedsOperationPathTagging(false);
}
initUnionPlan(opProcCtx, currTask, false);
@@ -1046,6 +1054,7 @@
// dependent on the redTask
if (reducer.getClass() == JoinOperator.class) {
cplan.setNeedsTagging(true);
+ cplan.setNeedsOperationPathTagging(false);
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (revision 1224666)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (working copy)
@@ -80,6 +80,11 @@
if (conf != null && conf.isGatherStats()) {
gatherStats(row);
}
+
+ if (conf != null && conf.isForwardRowNumber()) {
+ setRowNumber(rowNumber+1);
+ }
+
forward(row, inputObjInspectors[tag]);
}
@@ -169,6 +174,12 @@
if (conf == null) {
return;
}
+
+ LOG.info(this.getName() + " forward row number " + conf.isForwardRowNumber());
+ if(conf.isForwardRowNumber()){
+ initializeRowNumber();
+ }
+
if (!conf.isGatherStats()) {
return;
}
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (revision 1224666)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (working copy)
@@ -26,48 +26,20 @@
*
*/
@Explain(displayName = "Reduce Output Operator")
-public class ReduceSinkDesc implements Serializable {
+public class ReduceSinkDesc extends BaseReduceSinkDesc implements Serializable {
private static final long serialVersionUID = 1L;
- /**
- * Key columns are passed to reducer in the "key".
- */
- private java.util.ArrayList keyCols;
- private java.util.ArrayList outputKeyColumnNames;
- private List> distinctColumnIndices;
- /**
- * Value columns are passed to reducer in the "value".
- */
- private java.util.ArrayList valueCols;
- private java.util.ArrayList outputValueColumnNames;
- /**
- * Describe how to serialize the key.
- */
- private TableDesc keySerializeInfo;
- /**
- * Describe how to serialize the value.
- */
- private TableDesc valueSerializeInfo;
- /**
- * The tag for this reducesink descriptor.
- */
- private int tag;
+ private boolean needsOperationPathTagging;
+ public boolean getNeedsOperationPathTagging() {
+ return needsOperationPathTagging;
+ }
- /**
- * Number of distribution keys.
- */
- private int numDistributionKeys;
+ public void setNeedsOperationPathTagging(boolean needsOperationPathTagging) {
+ this.needsOperationPathTagging = needsOperationPathTagging;
+ }
- /**
- * The partition columns (CLUSTER BY or DISTRIBUTE BY in Hive language).
- * Partition columns decide the reducer that the current row goes to.
- * Partition columns are not passed to reducer.
- */
- private java.util.ArrayList partitionCols;
+ public ReduceSinkDesc(){
- private int numReducers;
-
- public ReduceSinkDesc() {
}
public ReduceSinkDesc(java.util.ArrayList keyCols,
@@ -78,6 +50,20 @@
java.util.ArrayList outputValueColumnNames, int tag,
java.util.ArrayList partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
+ this(keyCols, numDistributionKeys, valueCols,
+ outputKeyColumnNames, distinctColumnIndices, outputValueColumnNames, tag,
+ partitionCols, numReducers, keySerializeInfo, valueSerializeInfo, false);
+ }
+
+ public ReduceSinkDesc(java.util.ArrayList keyCols,
+ int numDistributionKeys,
+ java.util.ArrayList valueCols,
+ java.util.ArrayList outputKeyColumnNames,
+ List> distinctColumnIndices,
+ java.util.ArrayList outputValueColumnNames, int tag,
+ java.util.ArrayList partitionCols, int numReducers,
+ final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo,
+ boolean needsOperationPathTagging) {
this.keyCols = keyCols;
this.numDistributionKeys = numDistributionKeys;
this.valueCols = valueCols;
@@ -89,126 +75,7 @@
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
+ this.needsOperationPathTagging = needsOperationPathTagging;
}
- public java.util.ArrayList getOutputKeyColumnNames() {
- return outputKeyColumnNames;
- }
-
- public void setOutputKeyColumnNames(
- java.util.ArrayList outputKeyColumnNames) {
- this.outputKeyColumnNames = outputKeyColumnNames;
- }
-
- public java.util.ArrayList getOutputValueColumnNames() {
- return outputValueColumnNames;
- }
-
- public void setOutputValueColumnNames(
- java.util.ArrayList outputValueColumnNames) {
- this.outputValueColumnNames = outputValueColumnNames;
- }
-
- @Explain(displayName = "key expressions")
- public java.util.ArrayList getKeyCols() {
- return keyCols;
- }
-
- public void setKeyCols(final java.util.ArrayList keyCols) {
- this.keyCols = keyCols;
- }
-
- public int getNumDistributionKeys() {
- return this.numDistributionKeys;
- }
-
- public void setNumDistributionKeys(int numKeys) {
- this.numDistributionKeys = numKeys;
- }
-
- @Explain(displayName = "value expressions")
- public java.util.ArrayList getValueCols() {
- return valueCols;
- }
-
- public void setValueCols(final java.util.ArrayList valueCols) {
- this.valueCols = valueCols;
- }
-
- @Explain(displayName = "Map-reduce partition columns")
- public java.util.ArrayList getPartitionCols() {
- return partitionCols;
- }
-
- public void setPartitionCols(
- final java.util.ArrayList partitionCols) {
- this.partitionCols = partitionCols;
- }
-
- @Explain(displayName = "tag")
- public int getTag() {
- return tag;
- }
-
- public void setTag(int tag) {
- this.tag = tag;
- }
-
- /**
- * Returns the number of reducers for the map-reduce job. -1 means to decide
- * the number of reducers at runtime. This enables Hive to estimate the number
- * of reducers based on the map-reduce input data size, which is only
- * available right before we start the map-reduce job.
- */
- public int getNumReducers() {
- return numReducers;
- }
-
- public void setNumReducers(int numReducers) {
- this.numReducers = numReducers;
- }
-
- public TableDesc getKeySerializeInfo() {
- return keySerializeInfo;
- }
-
- public void setKeySerializeInfo(TableDesc keySerializeInfo) {
- this.keySerializeInfo = keySerializeInfo;
- }
-
- public TableDesc getValueSerializeInfo() {
- return valueSerializeInfo;
- }
-
- public void setValueSerializeInfo(TableDesc valueSerializeInfo) {
- this.valueSerializeInfo = valueSerializeInfo;
- }
-
- /**
- * Returns the sort order of the key columns.
- *
- * @return null, which means ascending order for all key columns, or a String
- * of the same length as key columns, that consists of only "+"
- * (ascending order) and "-" (descending order).
- */
- @Explain(displayName = "sort order")
- public String getOrder() {
- return keySerializeInfo.getProperties().getProperty(
- org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER);
- }
-
- public void setOrder(String orderStr) {
- keySerializeInfo.getProperties().setProperty(
- org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER,
- orderStr);
- }
-
- public List> getDistinctColumnIndices() {
- return distinctColumnIndices;
- }
-
- public void setDistinctColumnIndices(
- List> distinctColumnIndices) {
- this.distinctColumnIndices = distinctColumnIndices;
- }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (revision 1224666)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (working copy)
@@ -72,6 +72,7 @@
private Long minSplitSizePerRack;
private boolean needsTagging;
+ private boolean needsOperationPathTagging;
private boolean hadoopSupportsSplittable;
private MapredLocalWork mapLocalWork;
@@ -338,6 +339,16 @@
this.needsTagging = needsTagging;
}
+ // TODO: include "Needs Operation Paths Tagging: false" into correct results
+ // @Explain(displayName = "Needs Operation Paths Tagging", normalExplain = false)
+ public boolean getNeedsOperationPathTagging() {
+ return needsOperationPathTagging;
+ }
+
+ public void setNeedsOperationPathTagging(boolean needsOperationPathTagging) {
+ this.needsOperationPathTagging = needsOperationPathTagging;
+ }
+
public boolean getHadoopSupportsSplittable() {
return hadoopSupportsSplittable;
}
Index: ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
===================================================================
--- ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (revision 1224666)
+++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (working copy)
@@ -279,6 +279,7 @@
private void populateMapRedPlan3(Table src, Table src2) throws SemanticException {
mr.setNumReduceTasks(Integer.valueOf(5));
mr.setNeedsTagging(true);
+ mr.setNeedsOperationPathTagging(false);
ArrayList outputColumns = new ArrayList();
for (int i = 0; i < 2; i++) {
outputColumns.add("_col" + i);
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (revision 1224666)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (working copy)
@@ -38,6 +38,7 @@
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.OutputCollector;
@@ -512,7 +513,7 @@
LOG.debug("End group Done");
}
- protected boolean allInitializedParentsAreClosed() {
+ public boolean allInitializedParentsAreClosed() {
if (parentOperators != null) {
for (Operator extends Serializable> parent : parentOperators) {
if(parent==null){
@@ -1332,4 +1333,52 @@
public void cleanUpInputFileChangedOp() throws HiveException {
}
+ // bytesWritableGroupKey is only used when a query plan is optimized by CorrelationOptimizer.
+ // CorrelationFakeReduceSinkOperator will use this variable to determine when it needs to start or end the group
+ // for its child operator.
+ protected BytesWritable bytesWritableGroupKey;
+
+ public void setBytesWritableGroupKey(BytesWritable groupKey) {
+ if (bytesWritableGroupKey == null) {
+ bytesWritableGroupKey = new BytesWritable();
+ }
+ bytesWritableGroupKey.set(groupKey.get(), 0, groupKey.getSize());
+ }
+
+ public BytesWritable getBytesWritableGroupKey() {
+ return bytesWritableGroupKey;
+ }
+
+ // The number of current row
+ protected long rowNumber;
+
+ public void initializeRowNumber() {
+ this.rowNumber = 0L;
+ LOG.info("Operator " + id + " " + getName() + " row number initialized to 0");
+ if (childOperators == null) {
+ return;
+ }
+ LOG.info("Initializing row numbers of children of " + id + " " + getName());
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ childOperatorsArray[i].initializeRowNumber();
+ }
+ }
+
+ public void setRowNumber(long rowNumber) {
+ this.rowNumber = rowNumber;
+ if (childOperators == null) {
+ return;
+ }
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ assert rowNumber >= childOperatorsArray[i].getRowNumber();
+ if (rowNumber != childOperatorsArray[i].getRowNumber()) {
+ childOperatorsArray[i].setRowNumber(rowNumber);
+ }
+ }
+ }
+
+ public long getRowNumber() {
+ return rowNumber;
+ }
+
}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java (revision 0)
@@ -0,0 +1,844 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.QB;
+import org.apache.hadoop.hive.ql.parse.QBExpr;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+
+/**
+ * Implementation of correlation optimizer. The optimization is based on
+ * the paper
+ * "YSmart: Yet Another SQL-to-MapReduce Translator" (Rubao Lee et al.).
+ * This optimizer first detects three kinds of
+ * correlations, Input Correlation (IC), Transit Correlation (TC) and Job Flow Correlation (JFC),
+ * and then merge correlated MapReduce-jobs (MR-jobs) into one MR-job.
+ * Correlation correlation detection and query plan tree transformation is the last transformation in the
+ * {@link Optimizer}. Since opColumnExprMap, opParseCtx, opRowResolver may be changed by other optimizers,
+ * currently, correlation optimizer has two phases. The first phase is the first transformation in the
+ * {@link Optimizer}. In the first phase, original opColumnExprMap, opParseCtx, opRowResolver
+ * will be recorded. Then, the second phase (the last transformation) will perform correlation detection and
+ * query plan tree transformation.
+ *
+ *
Correlations
+ * For the definitions of correlations, see the
+ * original paper.
+ *
+ *
Rules
+ * Rules for merging correlated MR-jobs implemented in this correlation optimizer are:
+ *
If an MR-job for a Join operation has the same partitioning keys with its all preceding MR-jobs, correlation optimizer
+ * merges these MR-jobs into one MR-job.
+ *
If an MR-job for a GroupBy and Aggregation operation has the same partitioning keys with its preceding MR-job, correlation optimizer
+ * merges these two MR-jobs into one MR-job.
+ * Note: In the current implementation, if correlation optimizer detects MR-jobs of a sub-plan tree are correlated, it transforms
+ * this sub-plan tree to a single MR-job when the input of this sub-plan tree is not a temporary table. Otherwise, the current implementation
+ * will ignore this sub-plan tree.
+ *
+ *
Future Work
+ * There are several future work that will enhance the correlation optimizer. Here are three examples:
+ *
Add a new rule that is if two MR-jobs share the same partitioning keys and they have common input tables, merge these two
+ * MR-jobs into a single MR-job.
+ *
The current implementation detects MR-jobs which have the same partitioning keys as correlated MR-jobs. However, the condition
+ * of same partitioning keys can be relaxed to common partitioning keys.
+ *
The current implementation cannot optimize MR-jobs for the aggregation functions with a distinct keyword, which should be supported
+ * in the future implementation of the correlation optimizer.
+ */
+
+public class CorrelationOptimizer implements Transform {
+
+ static final private Log LOG = LogFactory.getLog(CorrelationOptimizer.class.getName());
+ private final HashMap AliastoTabName;
+ private final HashMap AliastoTab;
+
+ public CorrelationOptimizer() {
+ super();
+ AliastoTabName = new HashMap();
+ AliastoTab = new HashMap();
+ pGraphContext = null;
+ }
+
+ private void initializeAliastoTabNameMapping(QB qb) {
+ for (String alias: qb.getAliases()) {
+ AliastoTabName.put(alias, qb.getTabNameForAlias(alias));
+ AliastoTab.put(alias, qb.getMetaData().getSrcForAlias(alias));
+ }
+ for (String subqalias: qb.getSubqAliases()) {
+ QBExpr qbexpr = qb.getSubqForAlias(subqalias);
+ initializeAliastoTabNameMapping(qbexpr.getQB());
+ }
+ }
+
+ protected ParseContext pGraphContext;
+ private LinkedHashMap, OpParseContext> opParseCtx;
+ private final LinkedHashMap, OpParseContext> originalOpParseCtx =
+ new LinkedHashMap, OpParseContext>();
+ private final LinkedHashMap, RowResolver> originalOpRowResolver =
+ new LinkedHashMap, RowResolver>();
+ private final LinkedHashMap, Map> originalOpColumnExprMap =
+ new LinkedHashMap, Map>();
+
+ private boolean isPhase1 = true;
+
+ private Map groupbyRegular2MapSide;
+
+ /**
+ * Transform the query tree. Firstly, find out correlations between operations.
+ * Then, group these operators in groups
+ * @param pactx
+ * current parse context
+ */
+ public ParseContext transform(ParseContext pctx) throws SemanticException {
+
+ if (isPhase1) {
+
+ pGraphContext = pctx;
+ opParseCtx = pctx.getOpParseCtx();
+
+ CorrelationNodePhase1ProcCtx correlationCtx = new CorrelationNodePhase1ProcCtx();
+
+ Map opRules = new LinkedHashMap();
+
+ Dispatcher disp = new DefaultRuleDispatcher(getPhase1DefaultProc(), opRules, correlationCtx);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+
+ // Create a list of topop nodes
+ ArrayList topNodes = new ArrayList();
+ topNodes.addAll(pGraphContext.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+ isPhase1 = false;
+
+ } else {
+
+ /* Types of correlations:
+ * 1) Input Correlation: Multiple nodes have input correlation
+ (IC) if their input relation sets are not disjoint;
+ 2) Transit Correlation: Multiple nodes have transit correlation
+ (TC) if they have not only input correlation, but
+ also the same partition key;
+ 3) Job Flow Correlation: A node has job flow correlation
+ (JFC) with one of its child nodes if it has the same
+ partition key as that child node.
+ * */
+
+ pGraphContext = pctx;
+ opParseCtx = pctx.getOpParseCtx();
+
+ groupbyRegular2MapSide = pctx.getGroupbyRegular2MapSide();
+
+ initializeAliastoTabNameMapping(pGraphContext.getQB());
+
+ // 1: detect correlations
+ CorrelationNodeProcCtx correlationCtx = new CorrelationNodeProcCtx();
+
+ Map opRules = new LinkedHashMap();
+ opRules.put(new RuleRegExp("R1", "RS%"), new CorrelationNodeProc());
+
+ Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, correlationCtx);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+
+ // Create a list of topop nodes
+ ArrayList topNodes = new ArrayList();
+ topNodes.addAll(pGraphContext.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+
+ // 2: transform the query plan tree
+ LOG.info("Begain query plan transformation based on intra-query correlations");
+ for (IntraQueryCorrelation correlation: correlationCtx.getCorrelations()) {
+ pGraphContext = CorrelationOptimizerUtils.applyCorrelation(
+ correlation, pGraphContext, originalOpColumnExprMap, originalOpRowResolver,
+ groupbyRegular2MapSide, originalOpParseCtx);
+ }
+ LOG.info("Finish query plan transformation based on intra-query correlations");
+
+ }
+
+ return pGraphContext;
+ }
+
+
+
+ private NodeProcessor getPhase1DefaultProc() {
+ return new NodeProcessor() {
+ @Override
+ public Object process(Node nd, Stack stack,
+ NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
+ Operator extends Serializable> op = (Operator extends Serializable>)nd;
+ OpParseContext opCtx= opParseCtx.get(op);
+
+ if (op.getColumnExprMap() != null) {
+ originalOpColumnExprMap.put(op, op.getColumnExprMap());
+ }
+ originalOpParseCtx.put(op, opCtx);
+ originalOpRowResolver.put(op, opCtx.getRowResolver());
+
+ return null;
+ }
+ };
+ }
+
+ private class CorrelationNodeProc implements NodeProcessor {
+
+ public ReduceSinkOperator findNextChildReduceSinkOperator(ReduceSinkOperator rsop) {
+ Operator extends Serializable> op = rsop.getChildOperators().get(0);
+ while(!op.getName().equals("RS")) {
+ if (op.getName().equals("FS")) {
+ return null;
+ }
+ assert op.getChildOperators().size() <= 1;
+ op = op.getChildOperators().get(0);
+ }
+ return (ReduceSinkOperator)op;
+ }
+
+ /**
+ * Find all peer ReduceSinkOperators (which have the same child operator of op) of op (op included).
+ */
+ private ArrayList findPeerReduceSinkOperators(ReduceSinkOperator op) {
+
+ ArrayList peerReduceSinkOperators = new ArrayList();
+
+ List> children = op.getChildOperators();
+ assert children.size() == 1;
+
+ for (Operator extends Serializable> parent: children.get(0).getParentOperators()) {
+ assert (parent instanceof ReduceSinkOperator);
+ peerReduceSinkOperators.add((ReduceSinkOperator)parent);
+ }
+
+ return peerReduceSinkOperators;
+ }
+
+ private ArrayList findCorrelatedReduceSinkOperators(Operator op,
+ HashSet keyColumns, IntraQueryCorrelation correlation) throws Exception{
+
+ LOG.info("now detecting operator " + op.getIdentifier() + " " + op.getName());
+
+ ArrayList correlatedReduceSinkOps = new ArrayList();
+ if (op.getParentOperators() == null) {
+ return correlatedReduceSinkOps;
+ }
+ if (originalOpColumnExprMap.get(op) == null && !(op instanceof ReduceSinkOperator)) {
+ assert op.getParentOperators().size() == 1;
+ correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators(
+ (Operator extends Serializable>)op.getParentOperators().get(0), keyColumns, correlation));
+ } else if (originalOpColumnExprMap.get(op) != null && !(op instanceof ReduceSinkOperator)) {
+ HashSet newKeyColumns = new HashSet();
+ for (String keyColumn: keyColumns) {
+ ExprNodeDesc col = (ExprNodeDesc) originalOpColumnExprMap.get(op).get(keyColumn);
+ if (col instanceof ExprNodeColumnDesc) {
+ newKeyColumns.add(((ExprNodeColumnDesc)col).getColumn());
+ }
+ }
+
+ if (op.getName().equals("JOIN")) {
+ HashSet tableNeedToCheck = new HashSet();
+ for (String keyColumn: keyColumns) {
+ for (ColumnInfo cinfo: originalOpParseCtx.get(op).getRowResolver().getColumnInfos()) {
+ if (keyColumn.equals(cinfo.getInternalName())) {
+ tableNeedToCheck.add(cinfo.getTabAlias());
+ }
+ }
+ }
+
+ for (Object parent: op.getParentOperators()) {
+ assert originalOpParseCtx.get(parent).getRowResolver().getTableNames().size() == 1;
+ for (String tbl: originalOpParseCtx.get(parent).getRowResolver().getTableNames()) {
+ if (tableNeedToCheck.contains(tbl)) {
+ correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators((Operator)parent, newKeyColumns, correlation));
+ break;
+ }
+ }
+ }
+
+ } else {
+ assert op.getParentOperators().size() == 1;
+ correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators((Operator)op.getParentOperators().get(0), newKeyColumns, correlation));
+ }
+
+ } else if (originalOpColumnExprMap.get(op) != null && op instanceof ReduceSinkOperator) {
+
+ HashSet newKeyColumns = new HashSet();
+ for (String keyColumn: keyColumns) {
+ ExprNodeDesc col = (ExprNodeDesc) originalOpColumnExprMap.get(op).get(keyColumn);
+ if (col instanceof ExprNodeColumnDesc) {
+ newKeyColumns.add(((ExprNodeColumnDesc)col).getColumn());
+ }
+ }
+
+ ReduceSinkOperator rsop = (ReduceSinkOperator)op;
+ HashSet thisKeyColumns = new HashSet();
+ for (ExprNodeDesc key: rsop.getConf().getKeyCols()) {
+ if (key instanceof ExprNodeColumnDesc) {
+ thisKeyColumns.add(((ExprNodeColumnDesc)key).getColumn());
+ }
+ }
+
+ boolean isCorrelated = false;
+ Set intersection = new HashSet(newKeyColumns);
+ intersection.retainAll(thisKeyColumns);
+ // TODO: should use if intersection is empty to evaluate if two corresponding operators are correlated
+ //isCorrelated = !(intersection.isEmpty());
+ isCorrelated = (intersection.size() == thisKeyColumns.size());
+
+ ReduceSinkOperator nextChildReduceSinkOperator = findNextChildReduceSinkOperator(rsop);
+
+ if (isCorrelated) {
+ if (nextChildReduceSinkOperator.getChildOperators().get(0).getName().equals("JOIN")) {
+ if (intersection.size() != nextChildReduceSinkOperator.getConf().getKeyCols().size() ||
+ intersection.size() != rsop.getConf().getKeyCols().size()) {
+ isCorrelated = false;
+ }
+ }
+ }
+
+ if (isCorrelated) {
+ LOG.info("Operator " + op.getIdentifier() + " " + op.getName() + " is correlated");
+ LOG.info("--keys of this operator: " + thisKeyColumns.toString());
+ LOG.info("--keys of child operator: " + keyColumns.toString());
+ LOG.info("--keys of child operator mapped to this operator:" + newKeyColumns.toString());
+ if (((Operator extends Serializable>)(op.getChildOperators().get(0))).getName().equals("JOIN")) {
+ ArrayList peers = findPeerReduceSinkOperators(rsop);
+ correlatedReduceSinkOps.addAll(peers);
+ } else {
+ correlatedReduceSinkOps.add(rsop);
+ }
+ // this if block are useful when we use "isCorrelated = !(intersection.isEmpty());" for the evaluation of isCorrelated
+ if (nextChildReduceSinkOperator.getChildOperators().get(0).getName().equals("GBY") &&
+ (intersection.size() < nextChildReduceSinkOperator.getConf().getKeyCols().size())) {
+ LOG.info("--found this RS-GBY pattern that needs to be replaced to GBY-RS-GBY patterns " +
+ "common keys" + intersection.size() + ", keys of next group by operator" + nextChildReduceSinkOperator.getConf().getKeyCols().size());
+ correlation.addToRSGBYToBeReplacedByGBYRSGBY(nextChildReduceSinkOperator);
+ }
+
+ } else {
+ LOG.info("Operator " + op.getIdentifier() + " " + op.getName() + " is not correlated");
+ LOG.info("--keys of this operator: " + thisKeyColumns.toString());
+ LOG.info("--keys of child operator: " + keyColumns.toString());
+ LOG.info("--keys of child operator mapped to this operator:" + newKeyColumns.toString());
+ correlatedReduceSinkOps.clear();
+ correlation.getRSGBYToBeReplacedByGBYRSGBY().clear();
+ }
+ } else {
+ throw new Exception("Correlation optimizer: ReduceSinkOperator " + op.getIdentifier() + " does not have ColumnExprMap");
+ }
+ return correlatedReduceSinkOps;
+ }
+
+ private ArrayList exploitJFC(ReduceSinkOperator op, CorrelationNodeProcCtx correlationCtx, IntraQueryCorrelation correlation) {
+
+ correlationCtx.addWalked(op);
+ correlation.addToAllReduceSinkOperators(op);
+
+ ArrayList ReduceSinkOperators = new ArrayList();
+
+ boolean sholdDetect = true;
+
+ ArrayList keys = op.getConf().getKeyCols();
+ HashSet keyColumns = new HashSet();
+ for (ExprNodeDesc key: keys) {
+ if (!(key instanceof ExprNodeColumnDesc)) {
+ sholdDetect = false;
+ } else {
+ keyColumns.add(((ExprNodeColumnDesc)key).getColumn());
+ }
+ }
+
+ if (sholdDetect) {
+ ArrayList newReduceSinkOperators = new ArrayList();
+ for (Operator extends Serializable> parent: op.getParentOperators()) {
+ try {
+ LOG.info("Operator " + op.getIdentifier() + ": start detecting correlation from this operator");
+ LOG.info("--keys of this operator: " + keyColumns.toString());
+ ArrayList correlatedReduceSinkOperators =
+ findCorrelatedReduceSinkOperators(parent, keyColumns, correlation);
+ if (correlatedReduceSinkOperators == null || correlatedReduceSinkOperators.size() == 0) {
+ newReduceSinkOperators.add(op);
+ } else {
+ ArrayList deduplicatedCorrelatedReduceSinkOperators = new ArrayList();
+ for (ReduceSinkOperator rsop: correlatedReduceSinkOperators) {
+ if (!deduplicatedCorrelatedReduceSinkOperators.contains(rsop)) {
+ deduplicatedCorrelatedReduceSinkOperators.add(rsop);
+ }
+ }
+ for (ReduceSinkOperator rsop: deduplicatedCorrelatedReduceSinkOperators) {
+ if ( !correlation.getUp2downRSops().containsKey(op) ) {
+ correlation.getUp2downRSops().put(op, new ArrayList());
+ }
+ correlation.getUp2downRSops().get(op).add(rsop);
+
+ if ( !correlation.getDown2upRSops().containsKey(rsop)) {
+ correlation.getDown2upRSops().put(rsop, new ArrayList());
+ }
+ correlation.getDown2upRSops().get(rsop).add(op);
+ ArrayList exploited = exploitJFC(rsop, correlationCtx, correlation);
+ if (exploited == null || exploited.size() == 0) {
+ newReduceSinkOperators.add(rsop);
+ } else {
+ newReduceSinkOperators.addAll(exploited);
+ }
+ }
+ }
+
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ ReduceSinkOperators.clear();
+ ReduceSinkOperators.addAll(newReduceSinkOperators);
+ }
+ return ReduceSinkOperators;
+ }
+
+ private TableScanOperator findTableScanOPerator(Operator extends Serializable> startPoint) {
+ Operator extends Serializable> thisOp = (Operator extends Serializable>) startPoint.getParentOperators().get(0);
+ while(true) {
+ if (thisOp.getName().equals("RS")) {
+ return null;
+ } else if (thisOp.getName().equals("TS")) {
+ return (TableScanOperator)thisOp;
+ }
+ else {
+ if (thisOp.getParentOperators() != null) {
+ thisOp = (Operator extends Serializable>) thisOp.getParentOperators().get(0);
+ }
+ else {
+ break;
+ }
+ }
+ }
+ return null;
+ }
+
+ private void annotateOpPlan(IntraQueryCorrelation correlation) {
+ HashMap bottomReduceSink2OpPlanMap = new HashMap();
+ int count = 0;
+
+ for (ReduceSinkOperator rsop: correlation.getBottomReduceSinkOperators()) {
+ if (!bottomReduceSink2OpPlanMap.containsKey(rsop)) {
+ bottomReduceSink2OpPlanMap.put(rsop, count);
+ for (ReduceSinkOperator peerRSop: findPeerReduceSinkOperators(rsop)) {
+ if (correlation.getBottomReduceSinkOperators().contains(peerRSop)) {
+ bottomReduceSink2OpPlanMap.put(peerRSop, count);
+ }
+ }
+ count++;
+ }
+ }
+ correlation.setBottomReduceSink2OperationPathMap(bottomReduceSink2OpPlanMap);
+ }
+
+ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx,
+ Object... nodeOutputs) throws SemanticException {
+ LOG.info("Walk to operator " + ((Operator)nd).getIdentifier() + " " + ((Operator)nd).getName());
+
+ CorrelationNodeProcCtx correlationCtx = (CorrelationNodeProcCtx)ctx;
+
+ ReduceSinkOperator op = (ReduceSinkOperator)nd;
+
+ if (correlationCtx.isWalked(op)) {
+ return null;
+ }
+
+ if (op.getConf().getKeyCols().size() == 0 ||
+ (!op.getChildOperators().get(0).getName().equals("JOIN") &&
+ !op.getChildOperators().get(0).getName().equals("GBY"))) {
+ correlationCtx.addWalked(op);
+ return null;
+ }
+
+ // 1: find out correlation
+ IntraQueryCorrelation correlation = new IntraQueryCorrelation();
+ ArrayList peerReduceSinkOperators = findPeerReduceSinkOperators(op);
+ ArrayList bottomReduceSinkOperators = new ArrayList();
+ for (ReduceSinkOperator rsop: peerReduceSinkOperators) {
+
+ ArrayList thisBottomReduceSinkOperators= exploitJFC(rsop, correlationCtx, correlation);
+
+ // TODO: if we use "isCorrelated = !(intersection.isEmpty());" in the method findCorrelatedReduceSinkOperators
+ // for the evaluation of isCorrelated, uncomment the following if block. The reduceSinkOperator at the top level
+ // should take special take, since we cannot evaluate if the relationship of the set of keys of this operator with
+ // that of its next parent reduceSinkOperator.
+ // if (peerReduceSinkOperators.size() == 1) {
+ // correlation.addToRSGBYToBeReplacedByGBYRSGBY(rsop);
+ // }
+ if (thisBottomReduceSinkOperators.size() == 0) {
+ thisBottomReduceSinkOperators.add(rsop);
+ } else {
+ boolean isClear = false;
+ for (ReduceSinkOperator bottomrsop: thisBottomReduceSinkOperators) {
+ TableScanOperator tsop = findTableScanOPerator(bottomrsop);
+ if (tsop == null) {
+ isClear = true; // currently the optimizer can only optimize correlations involving source tables
+ } else {
+ if ( !correlation.getTop2TSops().containsKey(rsop) ) {
+ correlation.getTop2TSops().put(rsop, new ArrayList());
+ }
+ correlation.getTop2TSops().get(rsop).add(tsop);
+
+ if ( !correlation.getBottom2TSops().containsKey(bottomrsop)) {
+ correlation.getBottom2TSops().put(bottomrsop, new ArrayList());
+ }
+ correlation.getBottom2TSops().get(bottomrsop).add(tsop);
+ }
+ }
+ if (isClear) {
+ thisBottomReduceSinkOperators.clear();
+ thisBottomReduceSinkOperators.add(rsop);
+ }
+ }
+ bottomReduceSinkOperators.addAll(thisBottomReduceSinkOperators);
+ }
+
+ if (!peerReduceSinkOperators.containsAll(bottomReduceSinkOperators)) {
+ LOG.info("has job flow correlation");
+ correlation.setJobFlowCorrelation(true);
+ correlation.setJFCCorrelation(peerReduceSinkOperators, bottomReduceSinkOperators);
+ annotateOpPlan(correlation);
+ }
+
+ if (correlation.hasJobFlowCorrelation()) {
+ boolean hasICandTC = findICandTC(correlation);
+ LOG.info("has input correlation and transit correlation? " + hasICandTC);
+ correlation.setInputCorrelation(hasICandTC);
+ correlation.setTransitCorrelation(hasICandTC);
+ boolean isInvolve = isInvolveSelfJoin(correlation);
+ LOG.info("involve self-join? " + isInvolve);
+ correlation.setInvolveSelfJoin(isInvolve);
+ //TODO: support self-join involved cases. For self-join related operation paths, after the correlation dispatch operator, each path should be filtered by a
+ // filter operator
+ if (!isInvolve) {
+ LOG.info("correlation detected");
+ correlationCtx.addCorrelation(correlation);
+ } else {
+ LOG.info("correlation discarded. The current optimizer cannot optimize it");
+ }
+ }
+
+ correlationCtx.addWalkedAll(peerReduceSinkOperators);
+
+ return null;
+ }
+
+ private boolean isInvolveSelfJoin(IntraQueryCorrelation correlation) {
+ boolean isInvolve = false;
+ for (Entry> entry: correlation.getTable2CorrelatedRSops().entrySet()) {
+ for (ReduceSinkOperator rsop: entry.getValue()) {
+ HashSet intersection = new HashSet(findPeerReduceSinkOperators(rsop));
+ intersection.retainAll(entry.getValue());
+
+ // if involve self-join
+ if (intersection.size() > 1) {
+ isInvolve = true;
+ return isInvolve;
+ }
+ }
+ }
+ return isInvolve;
+ }
+
+ private boolean findICandTC(IntraQueryCorrelation correlation) {
+
+ boolean hasICandTC = false;
+ HashMap> table2RSops = new HashMap>();
+ HashMap> table2TSops = new HashMap>();
+
+ for (Entry> entry: correlation.getBottom2TSops().entrySet()) {
+ String tbl = AliastoTabName.get(entry.getValue().get(0).getConf().getAlias());
+ if (!table2RSops.containsKey(tbl) && !table2TSops.containsKey(tbl)) {
+ table2RSops.put(tbl, new ArrayList());
+ table2TSops.put(tbl, new ArrayList());
+ }
+ assert entry.getValue().size() == 1;
+ table2RSops.get(tbl).add(entry.getKey());
+ table2TSops.get(tbl).add(entry.getValue().get(0));
+ }
+
+ for (Entry> entry: table2RSops.entrySet()) {
+ if (entry.getValue().size() > 1) {
+ hasICandTC = true;
+ }
+ }
+
+ correlation.setICandTCCorrelation(table2RSops, table2TSops);
+
+ return hasICandTC;
+ }
+ }
+
+ private NodeProcessor getDefaultProc() {
+ return new NodeProcessor() {
+ @Override
+ public Object process(Node nd, Stack stack,
+ NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
+ LOG.info("Walk to operator " + ((Operator)nd).getIdentifier() + " " + ((Operator)nd).getName());
+ return null;
+ }
+ };
+ }
+
+ private class CorrelationNodePhase1ProcCtx implements NodeProcessorCtx {
+
+ }
+
+
+ public class IntraQueryCorrelation{
+
+ private final HashMap> down2upRSops = new HashMap>();
+ private final HashMap> up2downRSops = new HashMap>();
+
+ private final HashMap> top2TSops = new HashMap>();
+ private final HashMap> bottom2TSops = new HashMap>();
+
+ private ArrayList topReduceSinkOperators;
+ private ArrayList bottomReduceSinkOperators;
+
+ private HashMap> table2CorrelatedRSops;
+
+ private HashMap> table2CorrelatedTSops;
+
+ private HashMap bottomReduceSink2OperationPathMap;
+
+ private final HashMap>> dispatchConf =
+ new HashMap>>(); //inputTag->(Child->outputTag)
+ private final HashMap>> dispatchValueSelectDescConf =
+ new HashMap>>(); //inputTag->(Child->SelectDesc)
+ private final HashMap>> dispatchKeySelectDescConf =
+ new HashMap>>(); //inputTag->(Child->SelectDesc)
+
+ private final HashSet allReduceSinkOperators = new HashSet();
+
+ // this set contains all ReduceSink-GroupBy operator-pairs that should be be replaced by GroupBy-ReduceSink-GroupBy pattern.
+ // the type of first GroupByOperator is hash type and this one will be used to group records.
+ private final HashSet rSGBYToBeReplacedByGBYRSGBY = new HashSet();
+
+ public void addToRSGBYToBeReplacedByGBYRSGBY(ReduceSinkOperator rsop) {
+ rSGBYToBeReplacedByGBYRSGBY.add(rsop);
+ }
+
+ public HashSet getRSGBYToBeReplacedByGBYRSGBY() {
+ return rSGBYToBeReplacedByGBYRSGBY;
+ }
+
+ public void addToAllReduceSinkOperators(ReduceSinkOperator rsop) {
+ allReduceSinkOperators.add(rsop);
+ }
+
+ public HashSet getAllReduceSinkOperators() {
+ return allReduceSinkOperators;
+ }
+
+ public HashMap>> getDispatchConf() {
+ return dispatchConf;
+ }
+
+ public HashMap>> getDispatchValueSelectDescConf() {
+ return dispatchValueSelectDescConf;
+ }
+
+ public HashMap>> getDispatchKeySelectDescConf() {
+ return dispatchKeySelectDescConf;
+ }
+
+ public void addOperationPathToDispatchConf(Integer opPlan) {
+ if (!dispatchConf.containsKey(opPlan)) {
+ dispatchConf.put(opPlan, new HashMap>());
+ }
+ }
+
+ public HashMap> getDispatchConfForOperationPath(Integer opPlan) {
+ return dispatchConf.get(opPlan);
+ }
+
+ public void addOperationPathToDispatchValueSelectDescConf(Integer opPlan) {
+ if (!dispatchValueSelectDescConf.containsKey(opPlan)) {
+ dispatchValueSelectDescConf.put(opPlan, new HashMap>());
+ }
+ }
+
+ public HashMap> getDispatchValueSelectDescConfForOperationPath(Integer opPlan) {
+ return dispatchValueSelectDescConf.get(opPlan);
+ }
+
+ public void addOperationPathToDispatchKeySelectDescConf(Integer opPlan) {
+ if (!dispatchKeySelectDescConf.containsKey(opPlan)) {
+ dispatchKeySelectDescConf.put(opPlan, new HashMap>());
+ }
+ }
+
+ public HashMap> getDispatchKeySelectDescConfForOperationPath(Integer opPlan) {
+ return dispatchKeySelectDescConf.get(opPlan);
+ }
+
+ private boolean inputCorrelation = false;
+ private boolean transitCorrelation = false;
+ private boolean jobFlowCorrelation = false;
+
+ public void setBottomReduceSink2OperationPathMap(HashMap bottomReduceSink2OperationPathMap) {
+ this.bottomReduceSink2OperationPathMap = bottomReduceSink2OperationPathMap;
+ }
+
+ public HashMap getBottomReduceSink2OperationPathMap() {
+ return bottomReduceSink2OperationPathMap;
+ }
+
+ public void setInputCorrelation(boolean inputCorrelation) {
+ this.inputCorrelation = inputCorrelation;
+ }
+
+ public boolean hasInputCorrelation() {
+ return inputCorrelation;
+ }
+
+ public void setTransitCorrelation(boolean transitCorrelation) {
+ this.transitCorrelation = transitCorrelation;
+ }
+
+ public boolean hasTransitCorrelation() {
+ return transitCorrelation;
+ }
+
+ public void setJobFlowCorrelation(boolean jobFlowCorrelation) {
+ this.jobFlowCorrelation = jobFlowCorrelation;
+ }
+
+ public boolean hasJobFlowCorrelation() {
+ return jobFlowCorrelation;
+ }
+
+ public HashMap> getTop2TSops() {
+ return top2TSops;
+ }
+
+ public HashMap> getBottom2TSops() {
+ return bottom2TSops;
+ }
+
+ public HashMap> getDown2upRSops() {
+ return down2upRSops;
+ }
+
+ public HashMap> getUp2downRSops() {
+ return up2downRSops;
+ }
+
+ public void setJFCCorrelation(ArrayList peerReduceSinkOperators,
+ ArrayList bottomReduceSinkOperators) {
+ this.topReduceSinkOperators = peerReduceSinkOperators;
+ this.bottomReduceSinkOperators = bottomReduceSinkOperators;
+ }
+
+
+ public ArrayList getTopReduceSinkOperators() {
+ return topReduceSinkOperators;
+ }
+
+ public ArrayList getBottomReduceSinkOperators() {
+ return bottomReduceSinkOperators;
+ }
+
+ public void setICandTCCorrelation(HashMap> table2RSops,
+ HashMap> table2TSops) {
+ this.table2CorrelatedRSops = table2RSops;
+ this.table2CorrelatedTSops = table2TSops;
+ }
+
+ public HashMap> getTable2CorrelatedRSops() {
+ return table2CorrelatedRSops;
+ }
+
+ public HashMap> getTable2CorrelatedTSops() {
+ return table2CorrelatedTSops;
+ }
+
+ private boolean isInvolveSelfJoin = false;
+
+ public boolean isInvolveSelfJoin() {
+ return isInvolveSelfJoin;
+ }
+
+ public void setInvolveSelfJoin(boolean isInvolveSelfJoin) {
+ this.isInvolveSelfJoin = isInvolveSelfJoin;
+ }
+
+ }
+
+ private class CorrelationNodeProcCtx implements NodeProcessorCtx {
+
+ private final HashSet walked = new HashSet();
+
+ private final ArrayList correlations = new ArrayList();
+
+ public void addCorrelation(IntraQueryCorrelation correlation) {
+ correlations.add(correlation);
+ }
+
+ public ArrayList getCorrelations() {
+ return correlations;
+ }
+
+ public boolean isWalked(ReduceSinkOperator op) {
+ return walked.contains(op);
+ }
+
+ public void addWalked(ReduceSinkOperator op) {
+ walked.add(op);
+ }
+
+ public void addWalkedAll(Collection c) {
+ walked.addAll(c);
+ }
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java (revision 0)
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Reduce Sink Operator sends output to the reduce stage.
+ **/
+public abstract class BaseReduceSinkOperator extends TerminalOperator
+ implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ protected static final Log LOG = LogFactory.getLog(BaseReduceSinkOperator.class
+ .getName());
+
+ /**
+ * The evaluators for the key columns. Key columns decide the sort order on
+ * the reducer side. Key columns are passed to the reducer in the "key".
+ */
+ protected transient ExprNodeEvaluator[] keyEval;
+ /**
+ * The evaluators for the value columns. Value columns are passed to reducer
+ * in the "value".
+ */
+ protected transient ExprNodeEvaluator[] valueEval;
+ /**
+ * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
+ * Hive language). Partition columns decide the reducer that the current row
+ * goes to. Partition columns are not passed to reducer.
+ */
+ protected transient ExprNodeEvaluator[] partitionEval;
+
+ // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is
+ // ready
+ transient Serializer keySerializer;
+ transient boolean keyIsText;
+ transient Serializer valueSerializer;
+ transient int tag;
+ transient byte[] tagByte = new byte[1];
+ transient protected int numDistributionKeys;
+ transient protected int numDistinctExprs;
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+
+ try {
+ keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()];
+ int i = 0;
+ for (ExprNodeDesc e : conf.getKeyCols()) {
+ keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+
+ numDistributionKeys = conf.getNumDistributionKeys();
+ distinctColIndices = conf.getDistinctColumnIndices();
+ numDistinctExprs = distinctColIndices.size();
+
+ valueEval = new ExprNodeEvaluator[conf.getValueCols().size()];
+ i = 0;
+ for (ExprNodeDesc e : conf.getValueCols()) {
+ valueEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+
+ partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()];
+ i = 0;
+ for (ExprNodeDesc e : conf.getPartitionCols()) {
+ partitionEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+
+ tag = conf.getTag();
+ tagByte[0] = (byte) tag;
+ LOG.info("Using tag = " + tag);
+
+ TableDesc keyTableDesc = conf.getKeySerializeInfo();
+ keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
+ .newInstance();
+ keySerializer.initialize(null, keyTableDesc.getProperties());
+ keyIsText = keySerializer.getSerializedClass().equals(Text.class);
+
+ TableDesc valueTableDesc = conf.getValueSerializeInfo();
+ valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+ .newInstance();
+ valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+ firstRow = true;
+ initializeChildren(hconf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ transient InspectableObject tempInspectableObject = new InspectableObject();
+ transient HiveKey keyWritable = new HiveKey();
+ transient Writable value;
+
+ transient StructObjectInspector keyObjectInspector;
+ transient StructObjectInspector valueObjectInspector;
+ transient ObjectInspector[] partitionObjectInspectors;
+
+ transient Object[][] cachedKeys;
+ transient Object[] cachedValues;
+ transient List> distinctColIndices;
+
+ boolean firstRow;
+
+ transient Random random;
+
+ /**
+ * Initializes array of ExprNodeEvaluator. Adds Union field for distinct
+ * column indices for group by.
+ * Puts the return values into a StructObjectInspector with output column
+ * names.
+ *
+ * If distinctColIndices is empty, the object inspector is same as
+ * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)}
+ */
+ protected static StructObjectInspector initEvaluatorsAndReturnStruct(
+ ExprNodeEvaluator[] evals, List> distinctColIndices,
+ List outputColNames,
+ int length, ObjectInspector rowInspector)
+ throws HiveException {
+ int inspectorLen = evals.length > length ? length + 1 : evals.length;
+ List sois = new ArrayList(inspectorLen);
+
+ // keys
+ ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, 0, length, rowInspector);
+ sois.addAll(Arrays.asList(fieldObjectInspectors));
+
+ if (evals.length > length) {
+ // union keys
+ List uois = new ArrayList();
+ for (List distinctCols : distinctColIndices) {
+ List names = new ArrayList();
+ List eois = new ArrayList();
+ int numExprs = 0;
+ for (int i : distinctCols) {
+ names.add(HiveConf.getColumnInternalName(numExprs));
+ eois.add(evals[i].initialize(rowInspector));
+ numExprs++;
+ }
+ uois.add(ObjectInspectorFactory.getStandardStructObjectInspector(names, eois));
+ }
+ UnionObjectInspector uoi =
+ ObjectInspectorFactory.getStandardUnionObjectInspector(uois);
+ sois.add(uoi);
+ }
+ return ObjectInspectorFactory.getStandardStructObjectInspector(outputColNames, sois );
+ }
+
+ @Override
+ public abstract void processOp(Object row, int tag) throws HiveException;
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return "BaseReduceSink";
+ }
+
+ @Override
+ public OperatorType getType() {
+ return null;
+ }
+}
Index: ql/src/test/results/compiler/plan/groupby3.q.xml
===================================================================
--- ql/src/test/results/compiler/plan/groupby3.q.xml (revision 1224666)
+++ ql/src/test/results/compiler/plan/groupby3.q.xml (working copy)
@@ -158,7 +158,125 @@
-
+
+
+ VALUE._col4
+
+
+ _col5
+
+
+
+
+
+
+
+ string
+
+
+
+
+
+
+ VALUE._col3
+
+
+ _col4
+
+
+
+
+
+
+
+
+
+
+ VALUE._col2
+
+
+ _col3
+
+
+
+
+
+
+
+
+
+ count
+
+
+ sum
+
+
+
+
+
+
+
+
+ bigint
+
+
+
+
+
+
+ double
+
+
+
+
+
+
+
+
+
+
+ VALUE._col1
+
+
+ _col2
+
+
+
+
+
+
+
+
+
+
+ VALUE._col0
+
+
+ _col1
+
+
+
+
+
+
+
+
+
+
+ KEY._col0:0._col0
+
+
+ _col0
+
+
+
+
+
+
+
+
+
+
@@ -176,21 +294,7 @@
-
-
- _col0
-
-
-
-
-
-
-
- string
-
-
-
-
+
@@ -261,98 +365,19 @@
-
-
- _col1
-
-
-
-
-
-
-
- double
-
-
-
-
+
-
-
- _col2
-
-
-
-
-
-
-
-
-
- count
-
-
- sum
-
-
-
-
-
-
-
-
- bigint
-
-
-
-
-
-
-
-
-
-
-
+
-
-
- _col3
-
-
-
-
-
-
-
-
+
-
-
- _col4
-
-
-
-
-
-
-
-
+
-
-
- _col5
-
-
-
-
-
-
-
-
+
@@ -433,7 +458,7 @@
VALUE._col0
-
+
@@ -966,7 +991,7 @@
-
+
@@ -1045,7 +1070,7 @@
src
-
+
@@ -1246,7 +1271,7 @@
src
-
+
@@ -1532,7 +1557,7 @@
-
+
@@ -1545,7 +1570,7 @@
-
+
@@ -1558,7 +1583,7 @@
-
+
@@ -1600,7 +1625,7 @@
_col4
-
+ _col4
@@ -1614,7 +1639,7 @@
_col3
-
+ _col3
@@ -1628,7 +1653,7 @@
_col2
-
+ _col2
@@ -1636,13 +1661,13 @@
-
+ _col1
-
+ _col1
@@ -1650,13 +1675,13 @@
-
+ _col0
-
+ _col0
@@ -1664,7 +1689,7 @@
-
+
@@ -1675,19 +1700,19 @@
-
+
-
+
-
+
-
+
-
+
@@ -1751,7 +1776,7 @@
_col0
-
+
@@ -1764,7 +1789,7 @@
_col1
-
+
@@ -1777,7 +1802,7 @@
_col2
-
+
@@ -1843,7 +1868,7 @@
VALUE._col0
-
+
@@ -2040,7 +2065,7 @@
-
+
@@ -2053,7 +2078,7 @@
-
+
@@ -2066,7 +2091,7 @@
-
+
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (revision 1224666)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (working copy)
@@ -33,6 +33,7 @@
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.hooks.LineageInfo;
@@ -80,7 +81,10 @@
// reducer
private Map> groupOpToInputTables;
private Map prunedPartitions;
-
+
+ // map the implementation of group by operator with the RS-GBY pattern to pattern with map-side
+ // aggregation enabled like GBY-RS-GBY. This variable is only used by CorrelationOptimizer
+ Map groupbyRegular2MapSide;
/**
* The lineage information.
*/
@@ -157,7 +161,8 @@
HashMap opToSamplePruner,
SemanticAnalyzer.GlobalLimitCtx globalLimitCtx,
HashMap nameToSplitSample,
- HashSet semanticInputs, List> rootTasks) {
+ HashSet semanticInputs, List> rootTasks,
+ Map groupbyRegular2MapSide) {
this.conf = conf;
this.qb = qb;
this.ast = ast;
@@ -183,6 +188,7 @@
this.globalLimitCtx = globalLimitCtx;
this.semanticInputs = semanticInputs;
this.rootTasks = rootTasks;
+ this.groupbyRegular2MapSide = groupbyRegular2MapSide;
}
/**
@@ -529,4 +535,8 @@
this.rootTasks.remove(rootTask);
this.rootTasks.addAll(tasks);
}
+
+ public Map getGroupbyRegular2MapSide() {
+ return groupbyRegular2MapSide;
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationLocalSimulativeReduceSinkDesc.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationLocalSimulativeReduceSinkDesc.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationLocalSimulativeReduceSinkDesc.java (revision 0)
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Correlation Fake ReduceSinkDesc.
+ *
+ */
+@Explain(displayName = "Correlation Local Simulative Reduce Output Operator")
+public class CorrelationLocalSimulativeReduceSinkDesc extends BaseReduceSinkDesc implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public CorrelationLocalSimulativeReduceSinkDesc() {
+ }
+
+ public CorrelationLocalSimulativeReduceSinkDesc(java.util.ArrayList keyCols,
+ int numDistributionKeys,
+ java.util.ArrayList valueCols,
+ java.util.ArrayList outputKeyColumnNames,
+ List> distinctColumnIndices,
+ java.util.ArrayList outputValueColumnNames, int tag,
+ java.util.ArrayList partitionCols, int numReducers,
+ final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
+ this.keyCols = keyCols;
+ this.numDistributionKeys = numDistributionKeys;
+ this.valueCols = valueCols;
+ this.outputKeyColumnNames = outputKeyColumnNames;
+ this.outputValueColumnNames = outputValueColumnNames;
+ this.tag = tag;
+ this.numReducers = numReducers;
+ this.partitionCols = partitionCols;
+ this.keySerializeInfo = keySerializeInfo;
+ this.valueSerializeInfo = valueSerializeInfo;
+ this.distinctColumnIndices = distinctColumnIndices;
+ }
+
+ public CorrelationLocalSimulativeReduceSinkDesc(ReduceSinkDesc reduceSinkDesc){
+ this.keyCols = reduceSinkDesc.getKeyCols();
+ this.numDistributionKeys = reduceSinkDesc.getNumDistributionKeys();
+ this.valueCols = reduceSinkDesc.getValueCols();
+ this.outputKeyColumnNames = reduceSinkDesc.getOutputKeyColumnNames();
+ this.outputValueColumnNames = reduceSinkDesc.getOutputValueColumnNames();
+ this.tag = reduceSinkDesc.getTag();
+ this.numReducers = reduceSinkDesc.getNumReducers();
+ this.partitionCols = reduceSinkDesc.getPartitionCols();
+ this.keySerializeInfo = reduceSinkDesc.getKeySerializeInfo();
+ this.valueSerializeInfo = reduceSinkDesc.getValueSerializeInfo();
+ this.distinctColumnIndices = reduceSinkDesc.getDistinctColumnIndices();
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java (revision 0)
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Correlation composite operator implementation.
+ * This operator is used only in map phase.
+ * Suppose that multiple sub-queries involve a common table,
+ * to share the table scan, CorrelationCompositeOperator will be used.
+ * For example, suppose that the common table is T and predicates P1 and P2 will be used
+ * in sub-queries SQ1 and SQ2, respectively. The CorrelationCompositeOperator
+ * will apply P1 and P2 on the record and tag the record based on if P1 or P2 is true.
+ **/
+public class CorrelationCompositeOperator extends Operator implements Serializable {
+
+ static final private Log LOG = LogFactory.getLog(Driver.class.getName());
+ static final private LogHelper console = new LogHelper(LOG);
+
+ public static enum Counter {
+ FORWARDED
+ }
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ private ReduceSinkOperator correspondingReduceSinkOperators;
+
+ private transient final LongWritable forwarded_count;
+
+ private transient boolean firstRow;
+
+ public CorrelationCompositeOperator() {
+ super();
+ forwarded_count = new LongWritable();
+ }
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return new String("CCO");
+ }
+
+ private Object[] rowBuffer;
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ firstRow = true;
+ rowBuffer = new Object[parentOperators.size()];
+ correspondingReduceSinkOperators = conf.getCorrespondingReduceSinkOperator();
+ allOperationPathTags = conf.getAllOperationPathTags();
+ statsMap.put(Counter.FORWARDED, forwarded_count);
+ outputObjInspector = ObjectInspectorUtils.getStandardObjectInspector(outputObjInspector, ObjectInspectorCopyOption.JAVA);
+
+ //initialize its children
+ initializeChildren(hconf);
+ }
+
+ private int[] allOperationPathTags;
+
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
+ rowBuffer[tag] = ObjectInspectorUtils.copyToStandardObject(row, inputObjInspectors[tag], ObjectInspectorCopyOption.JAVA);
+ }
+
+ @Override
+ public void setRowNumber(long rowNumber) {
+ this.rowNumber = rowNumber;
+ if (childOperators == null) {
+ return;
+ }
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ assert rowNumber >= childOperatorsArray[i].getRowNumber();
+ if (rowNumber != childOperatorsArray[i].getRowNumber()) {
+ childOperatorsArray[i].setRowNumber(rowNumber);
+ }
+ }
+ if (firstRow) {
+ for (int i = 0; i < rowBuffer.length; i++) {
+ rowBuffer[i] = null;
+ }
+ firstRow = false;
+ } else {
+ ArrayList operationPathTags = new ArrayList();
+ boolean isForward = false;
+ Object forwardedRow = null;
+ for (int i = 0; i < rowBuffer.length; i++) {
+ if (rowBuffer[i] != null){
+ isForward = true;
+ operationPathTags.add(allOperationPathTags[i]);
+ if (forwardedRow == null) {
+ forwardedRow = rowBuffer[i];
+ }
+ }
+ }
+ if (isForward) {
+ assert correspondingReduceSinkOperators != null;
+ correspondingReduceSinkOperators.setOperationPathTags(operationPathTags);
+ forwarded_count.set(forwarded_count.get() + 1);
+ try {
+ forward(forwardedRow, null);
+ } catch (HiveException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ for (int i = 0; i < rowBuffer.length; i++) {
+ rowBuffer[i] = null;
+ }
+ forwardedRow = null;
+ }
+ }
+
+ @Override
+ public void closeOp(boolean abort) throws HiveException {
+ if(!abort){
+ ArrayList operationPathTags = new ArrayList();
+ boolean isForward = false;
+ Object forwardedRow = null;
+ for (int i = 0; i < rowBuffer.length; i++) {
+ if (rowBuffer[i] != null){
+ isForward = true;
+ operationPathTags.add(allOperationPathTags[i]);
+ if (forwardedRow == null) {
+ forwardedRow = rowBuffer[i];
+ }
+ }
+ }
+ if (isForward) {
+ assert correspondingReduceSinkOperators != null;
+ correspondingReduceSinkOperators.setOperationPathTags(operationPathTags);
+ forwarded_count.set(forwarded_count.get() + 1);
+ try {
+ forward(forwardedRow, null);
+ } catch (HiveException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ for (int i = 0; i < rowBuffer.length; i++) {
+ rowBuffer[i] = null;
+ }
+ }
+ }
+
+ @Override
+ public OperatorType getType() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationReducerDispatchDesc.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationReducerDispatchDesc.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationReducerDispatchDesc.java (revision 0)
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map.Entry;
+
+
+/**
+ * Correlation dispatch operator Descriptor implementation.
+ *
+ */
+@Explain(displayName = "Correlation Dispatch Operator")
+public class CorrelationReducerDispatchDesc implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+
+ private HashMap>> dispatchConf;
+ private HashMap>> dispatchValueSelectDescConf;
+ private HashMap>> dispatchKeySelectDescConf;
+
+ public CorrelationReducerDispatchDesc(){
+ this.dispatchConf = new HashMap>>();
+ this.dispatchValueSelectDescConf = new HashMap>>();
+ this.dispatchKeySelectDescConf = new HashMap>>();
+
+ }
+
+ public CorrelationReducerDispatchDesc(HashMap>> dispatchConf){
+ this.dispatchConf = dispatchConf;
+ this.dispatchValueSelectDescConf = new HashMap>>();
+ this.dispatchKeySelectDescConf = new HashMap>>();
+ for(Entry>> entry: this.dispatchConf.entrySet()){
+ HashMap> tmp = new HashMap>();
+ for(Integer child: entry.getValue().keySet()){
+ tmp.put(child, new ArrayList());
+ tmp.get(child).add(new SelectDesc(true));
+ }
+ this.dispatchValueSelectDescConf.put(entry.getKey(), tmp);
+ this.dispatchKeySelectDescConf.put(entry.getKey(), tmp);
+ }
+ }
+
+ public CorrelationReducerDispatchDesc(HashMap>> dispatchConf,
+ HashMap>> dispatchKeySelectDescConf,
+ HashMap>> dispatchValueSelectDescConf){
+ this.dispatchConf = dispatchConf;
+ this.dispatchValueSelectDescConf = dispatchValueSelectDescConf;
+ this.dispatchKeySelectDescConf = dispatchKeySelectDescConf;
+ }
+
+ public void setDispatchConf(HashMap>> dispatchConf){
+ this.dispatchConf = dispatchConf;
+ }
+
+ public HashMap>> getDispatchConf(){
+ return this.dispatchConf;
+ }
+
+ public void setDispatchValueSelectDescConf(HashMap>> dispatchValueSelectDescConf){
+ this.dispatchValueSelectDescConf = dispatchValueSelectDescConf;
+ }
+
+ public HashMap>> getDispatchValueSelectDescConf(){
+ return this.dispatchValueSelectDescConf;
+ }
+
+ public void setDispatchKeySelectDescConf(HashMap>> dispatchKeySelectDescConf){
+ this.dispatchKeySelectDescConf = dispatchKeySelectDescConf;
+ }
+
+ public HashMap>> getDispatchKeySelectDescConf() {
+ return this.dispatchKeySelectDescConf;
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (revision 1224666)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (working copy)
@@ -21,171 +21,45 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
import java.util.Random;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.Serializer;
-import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
/**
* Reduce Sink Operator sends output to the reduce stage.
**/
-public class ReduceSinkOperator extends TerminalOperator
+public class ReduceSinkOperator extends BaseReduceSinkOperator
implements Serializable {
private static final long serialVersionUID = 1L;
- /**
- * The evaluators for the key columns. Key columns decide the sort order on
- * the reducer side. Key columns are passed to the reducer in the "key".
- */
- protected transient ExprNodeEvaluator[] keyEval;
- /**
- * The evaluators for the value columns. Value columns are passed to reducer
- * in the "value".
- */
- protected transient ExprNodeEvaluator[] valueEval;
- /**
- * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
- * Hive language). Partition columns decide the reducer that the current row
- * goes to. Partition columns are not passed to reducer.
- */
- protected transient ExprNodeEvaluator[] partitionEval;
+ private final ArrayList operationPathTags = new ArrayList(); // operation path tags
+ private final byte[] operationPathTagsByte = new byte[1];
- // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is
- // ready
- transient Serializer keySerializer;
- transient boolean keyIsText;
- transient Serializer valueSerializer;
- transient int tag;
- transient byte[] tagByte = new byte[1];
- transient protected int numDistributionKeys;
- transient protected int numDistinctExprs;
-
- @Override
- protected void initializeOp(Configuration hconf) throws HiveException {
-
- try {
- keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()];
- int i = 0;
- for (ExprNodeDesc e : conf.getKeyCols()) {
- keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
- }
-
- numDistributionKeys = conf.getNumDistributionKeys();
- distinctColIndices = conf.getDistinctColumnIndices();
- numDistinctExprs = distinctColIndices.size();
-
- valueEval = new ExprNodeEvaluator[conf.getValueCols().size()];
- i = 0;
- for (ExprNodeDesc e : conf.getValueCols()) {
- valueEval[i++] = ExprNodeEvaluatorFactory.get(e);
- }
-
- partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()];
- i = 0;
- for (ExprNodeDesc e : conf.getPartitionCols()) {
- partitionEval[i++] = ExprNodeEvaluatorFactory.get(e);
- }
-
- tag = conf.getTag();
- tagByte[0] = (byte) tag;
- LOG.info("Using tag = " + tag);
-
- TableDesc keyTableDesc = conf.getKeySerializeInfo();
- keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
- .newInstance();
- keySerializer.initialize(null, keyTableDesc.getProperties());
- keyIsText = keySerializer.getSerializedClass().equals(Text.class);
-
- TableDesc valueTableDesc = conf.getValueSerializeInfo();
- valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
- .newInstance();
- valueSerializer.initialize(null, valueTableDesc.getProperties());
-
- firstRow = true;
- initializeChildren(hconf);
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
+ public void setOperationPathTags(ArrayList operationPathTags) {
+ this.operationPathTags.addAll(operationPathTags);
+ int operationPathTagsInt = 0;
+ int tmp = 1;
+ for (Integer operationPathTag: operationPathTags) {
+ operationPathTagsInt += tmp << operationPathTag.intValue();
}
+ operationPathTagsByte[0] = (byte) operationPathTagsInt;
}
- transient InspectableObject tempInspectableObject = new InspectableObject();
- transient HiveKey keyWritable = new HiveKey();
- transient Writable value;
+ public ArrayList getOperationPathTags() {
+ return this.operationPathTags;
+ }
- transient StructObjectInspector keyObjectInspector;
- transient StructObjectInspector valueObjectInspector;
- transient ObjectInspector[] partitionObjectInspectors;
+ public ReduceSinkOperator(){
- transient Object[][] cachedKeys;
- transient Object[] cachedValues;
- transient List> distinctColIndices;
-
- boolean firstRow;
-
- transient Random random;
-
- /**
- * Initializes array of ExprNodeEvaluator. Adds Union field for distinct
- * column indices for group by.
- * Puts the return values into a StructObjectInspector with output column
- * names.
- *
- * If distinctColIndices is empty, the object inspector is same as
- * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)}
- */
- protected static StructObjectInspector initEvaluatorsAndReturnStruct(
- ExprNodeEvaluator[] evals, List> distinctColIndices,
- List outputColNames,
- int length, ObjectInspector rowInspector)
- throws HiveException {
- int inspectorLen = evals.length > length ? length + 1 : evals.length;
- List sois = new ArrayList(inspectorLen);
-
- // keys
- ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, 0, length, rowInspector);
- sois.addAll(Arrays.asList(fieldObjectInspectors));
-
- if (evals.length > length) {
- // union keys
- List uois = new ArrayList();
- for (List distinctCols : distinctColIndices) {
- List names = new ArrayList();
- List eois = new ArrayList();
- int numExprs = 0;
- for (int i : distinctCols) {
- names.add(HiveConf.getColumnInternalName(numExprs));
- eois.add(evals[i].initialize(rowInspector));
- numExprs++;
- }
- uois.add(ObjectInspectorFactory.getStandardStructObjectInspector(names, eois));
- }
- UnionObjectInspector uoi =
- ObjectInspectorFactory.getStandardUnionObjectInspector(uois);
- sois.add(uoi);
- }
- return ObjectInspectorFactory.getStandardStructObjectInspector(outputColNames, sois );
}
@Override
@@ -267,9 +141,18 @@
keyWritable.set(key.getBytes(), 0, key.getLength());
} else {
int keyLength = key.getLength();
- keyWritable.setSize(keyLength + 1);
+ if (!this.getConf().getNeedsOperationPathTagging()) {
+ keyWritable.setSize(keyLength + 1);
+ } else {
+ keyWritable.setSize(keyLength + 2);
+ }
System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
- keyWritable.get()[keyLength] = tagByte[0];
+ if (!this.getConf().getNeedsOperationPathTagging()) {
+ keyWritable.get()[keyLength] = tagByte[0];
+ } else {
+ keyWritable.get()[keyLength] = operationPathTagsByte[0];
+ keyWritable.get()[keyLength + 1] = tagByte[0];
+ }
}
} else {
// Must be BytesWritable
@@ -279,9 +162,18 @@
keyWritable.set(key.getBytes(), 0, key.getLength());
} else {
int keyLength = key.getLength();
- keyWritable.setSize(keyLength + 1);
- System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
- keyWritable.get()[keyLength] = tagByte[0];
+ if (!this.getConf().getNeedsOperationPathTagging()) {
+ keyWritable.setSize(keyLength + 1);
+ } else {
+ keyWritable.setSize(keyLength + 2);
+ }
+ System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
+ if (!this.getConf().getNeedsOperationPathTagging()) {
+ keyWritable.get()[keyLength] = tagByte[0];
+ } else {
+ keyWritable.get()[keyLength] = operationPathTagsByte[0];
+ keyWritable.get()[keyLength + 1] = tagByte[0];
+ }
}
}
keyWritable.setHashCode(keyHashCode);
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java (revision 0)
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+
+/**
+ * Correlation dispatch operator implementation.
+ * If used, CorrelationReducerDispatchOperator is the first operator in reduce phase.
+ * It will dispatch the record to corresponding JOIN or GBY operators.
+ * Suppose there are n children of this dispatch operator, a input record will be
+ * evaluated by n DispatchHandler that is used to select the corresponding parts of a record
+ * and then forward to succeeding JOIN or GBY operators.
+ */
+public class CorrelationReducerDispatchOperator extends Operator implements Serializable{
+
+ private static final long serialVersionUID = 1L;
+ private static String[] fieldNames;
+ static {
+ ArrayList fieldNameArray = new ArrayList();
+ for (Utilities.ReduceField r : Utilities.ReduceField.values()) {
+ fieldNameArray.add(r.toString());
+ }
+ fieldNames = fieldNameArray.toArray(new String[0]);
+ }
+
+ protected static class DispatchHandler {
+
+ protected Log l4j = LogFactory.getLog(this.getClass().getName());
+
+ private final ObjectInspector[] inputObjInspector;
+ private ObjectInspector outputObjInspector;
+ private ObjectInspector keyObjInspector;
+ private ObjectInspector valueObjInspector;
+ private final byte inputTag;
+ private final byte outputTag;
+ private final byte childIndx;
+ private final ByteWritable outputTagByteWritable;
+ private final SelectDesc selectDesc;
+ private final SelectDesc keySelectDesc;
+ private ExprNodeEvaluator[] keyEval;
+ private ExprNodeEvaluator[] eval;
+
+ // counters for debugging
+ private transient long cntr = 0;
+ private transient long nextCntr = 1;
+
+ private long getNextCntr(long cntr) {
+ // A very simple counter to keep track of number of rows processed by an
+ // operator. It dumps
+ // every 1 million times, and quickly before that
+ if (cntr >= 1000000) {
+ return cntr + 1000000;
+ }
+
+ return 10 * cntr;
+ }
+
+ public long getCntr() {
+ return this.cntr;
+ }
+
+ private final Log LOG;
+ private final boolean isLogInfoEnabled;
+ private final String id;
+
+ public DispatchHandler(ObjectInspector[] inputObjInspector, byte inputTag, byte childIndx, byte outputTag,
+ SelectDesc selectDesc, SelectDesc keySelectDesc, Log LOG, String id)
+ throws HiveException {
+ this.inputObjInspector = inputObjInspector;
+ assert this.inputObjInspector.length == 1;
+ this.inputTag = inputTag;
+ this.childIndx = childIndx;
+ this.outputTag = outputTag;
+ this.selectDesc = selectDesc;
+ this.keySelectDesc = keySelectDesc;
+ this.outputTagByteWritable = new ByteWritable(outputTag);
+ this.LOG = LOG;
+ this.isLogInfoEnabled = LOG.isInfoEnabled();
+ this.id = id;
+ init();
+ }
+
+ private void init() throws HiveException {
+ ArrayList ois = new ArrayList();
+ if (keySelectDesc.isSelStarNoCompute()) {
+ ois.add((ObjectInspector) ((ArrayList)inputObjInspector[0]).get(0));
+ } else {
+ ArrayList colList = this.keySelectDesc.getColList();
+ keyEval = new ExprNodeEvaluator[colList.size()];
+ for (int k = 0; k < colList.size(); k++) {
+ assert (colList.get(k) != null);
+ keyEval[k] = ExprNodeEvaluatorFactory.get(colList.get(k));
+ }
+ keyObjInspector = initEvaluatorsAndReturnStruct(keyEval, keySelectDesc
+ .getOutputColumnNames(), ((StandardStructObjectInspector) inputObjInspector[0]).getAllStructFieldRefs().get(0).getFieldObjectInspector());
+
+ ois.add(keyObjInspector);
+ l4j.info("Key: input tag " + (int)inputTag + ", output tag " + (int)outputTag + ", SELECT inputOIForThisTag"
+ + ((StructObjectInspector) inputObjInspector[0]).getTypeName());
+
+ }
+ if (selectDesc.isSelStarNoCompute()) {
+ ois.add((ObjectInspector) ((ArrayList)inputObjInspector[0]).get(1));
+ } else {
+ ArrayList colList = this.selectDesc.getColList();
+ eval = new ExprNodeEvaluator[colList.size()];
+ for (int k = 0; k < colList.size(); k++) {
+ assert (colList.get(k) != null);
+ eval[k] = ExprNodeEvaluatorFactory.get(colList.get(k));
+ }
+ valueObjInspector = initEvaluatorsAndReturnStruct(eval, selectDesc
+ .getOutputColumnNames(), ((StandardStructObjectInspector) inputObjInspector[0]).getAllStructFieldRefs().get(1).getFieldObjectInspector());
+
+ ois.add(valueObjInspector);
+ l4j.info("input tag " + (int)inputTag + ", output tag " + (int)outputTag + ", SELECT inputOIForThisTag"
+ + ((StructObjectInspector) inputObjInspector[0]).getTypeName()); //Yin
+
+ }
+ ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector);
+ outputObjInspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois);
+ l4j.info("input tag " + (int)inputTag + ", output tag " + (int)outputTag + ", SELECT outputObjInspector"
+ + ((StructObjectInspector) outputObjInspector).getTypeName()); //Yin
+ }
+
+ public ObjectInspector getOutputObjInspector() {
+ return outputObjInspector;
+ }
+
+ public Object process(Object row) throws HiveException {
+ Object[] keyOutput = new Object[keyEval.length];
+ Object[] valueOutput = new Object[eval.length];
+ ArrayList outputRow = new ArrayList(3);
+ List thisRow = (List)row;
+ if (keySelectDesc.isSelStarNoCompute()) {
+ outputRow.add(thisRow.get(0));
+ } else {
+ Object key = thisRow.get(0);
+ for (int j = 0; j < keyEval.length; j++) {
+ try {
+ keyOutput[j] = keyEval[j].evaluate(key);
+
+ } catch (HiveException e) {
+ throw e;
+ } catch (RuntimeException e) {
+ throw new HiveException("Error evaluating "
+ + keySelectDesc.getColList().get(j).getExprString(), e);
+ }
+ }
+ outputRow.add(keyOutput);
+ }
+
+ if (selectDesc.isSelStarNoCompute()) {
+ outputRow.add(thisRow.get(1));
+ } else {
+ Object value = thisRow.get(1);
+ for (int j = 0; j < eval.length; j++) {
+ try {
+ valueOutput[j] = eval[j].evaluate(value);
+ } catch (HiveException e) {
+ throw e;
+ } catch (RuntimeException e) {
+ throw new HiveException("Error evaluating "
+ + selectDesc.getColList().get(j).getExprString(), e);
+ }
+ }
+ outputRow.add(valueOutput);
+ }
+ outputRow.add(outputTagByteWritable);
+
+ if (isLogInfoEnabled) {
+ cntr++;
+ if (cntr == nextCntr) {
+ LOG.info(id + "(inputTag, childIndx, outputTag)=(" + inputTag + ", " + childIndx + ", " + outputTag + "), forwarding " + cntr + " rows");
+ nextCntr = getNextCntr(cntr);
+ }
+ }
+
+ return outputRow;
+ }
+
+ public void printCloseOpLog() {
+ LOG.info(id + "(inputTag, childIndx, outputTag)=(" + inputTag + ", " + childIndx + ", " + outputTag + "), forwarded " + cntr + " rows");
+ }
+
+ }
+
+ //inputTag->(Child->List)
+ private HashMap>> dispatchConf;
+ //inputTag->(Child->List)
+ private HashMap>> dispatchValueSelectDescConf;
+ //inputTag->(Child->List)
+ private HashMap>> dispatchKeySelectDescConf;
+ //inputTag->(Child->List)
+ private HashMap>> dispatchHandlers;
+ //Child->(outputTag->DispatchHandler)
+ private HashMap> child2OutputTag2DispatchHandlers;
+ //Child->Child's inputObjInspectors
+ private HashMap childInputObjInspectors;
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+
+ dispatchConf = conf.getDispatchConf();
+ dispatchValueSelectDescConf = conf.getDispatchValueSelectDescConf();
+ dispatchKeySelectDescConf = conf.getDispatchKeySelectDescConf();
+ dispatchHandlers = new HashMap>>();
+ for (Entry>> entry: dispatchConf.entrySet()) {
+ HashMap> tmp = new HashMap>();
+ for (Entry> child2outputTag: entry.getValue().entrySet()) {
+ tmp.put(child2outputTag.getKey(), new ArrayList());
+ int indx = 0;
+ for (Integer outputTag: child2outputTag.getValue()) {
+ tmp.get(child2outputTag.getKey()).add(
+ new DispatchHandler(new ObjectInspector[]{inputObjInspectors[entry.getKey()]},
+ entry.getKey().byteValue(), child2outputTag.getKey().byteValue(), outputTag.byteValue(),
+ dispatchValueSelectDescConf.get(entry.getKey()).get(child2outputTag.getKey()).get(indx),
+ dispatchKeySelectDescConf.get(entry.getKey()).get(child2outputTag.getKey()).get(indx), LOG, id));
+ indx++;
+ }
+ }
+ dispatchHandlers.put(entry.getKey(), tmp);
+ }
+
+ child2OutputTag2DispatchHandlers = new HashMap>();
+ for (Entry>> entry: dispatchConf.entrySet()) {
+ for (Entry> child2outputTag: entry.getValue().entrySet()) {
+ if (!child2OutputTag2DispatchHandlers.containsKey(child2outputTag.getKey())) {
+ child2OutputTag2DispatchHandlers.put(child2outputTag.getKey(), new HashMap());
+ }
+ int indx = 0;
+ for (Integer outputTag: child2outputTag.getValue()) {
+ child2OutputTag2DispatchHandlers.get(child2outputTag.getKey()).
+ put(outputTag, dispatchHandlers.get(entry.getKey()).get(child2outputTag.getKey()).get(indx));
+ indx++;
+ }
+
+ }
+ }
+
+ childInputObjInspectors = new HashMap();
+ for (Entry> entry:
+ child2OutputTag2DispatchHandlers.entrySet()) {
+ Integer l = Collections.max(entry.getValue().keySet());
+ ObjectInspector[] childObjInspectors = new ObjectInspector[l.intValue() + 1];
+ for (Entry e: entry.getValue().entrySet()) {
+ if (e.getKey().intValue() == -1) {
+ assert childObjInspectors.length == 1;
+ childObjInspectors[0] = e.getValue().getOutputObjInspector();
+ } else {
+ childObjInspectors[e.getKey().intValue()] = e.getValue().getOutputObjInspector();
+ }
+ }
+ childInputObjInspectors.put(entry.getKey(), childObjInspectors);
+ }
+
+ initializeChildren(hconf);
+ }
+
+ //Each child should has its own outputObjInspector
+ @Override
+ protected void initializeChildren(Configuration hconf) throws HiveException {
+ state = State.INIT;
+ LOG.info("Operator " + id + " " + getName() + " initialized");
+ if (childOperators == null) {
+ return;
+ }
+ LOG.info("Initializing children of " + id + " " + getName());
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ LOG.info("Initializing child " + i + " " + childOperatorsArray[i].getIdentifier() + " " +
+ childOperatorsArray[i].getName() +
+ " " + childInputObjInspectors.get(i).length);
+ childOperatorsArray[i].initialize(hconf, childInputObjInspectors.get(i));
+ if (reporter != null) {
+ childOperatorsArray[i].setReporter(reporter);
+ }
+ }
+ }
+
+ private int opTags;
+ private int inputTag;
+
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
+ ArrayList thisRow = (ArrayList)row;
+ assert thisRow.size() == 4;
+ opTags = ((ByteWritable)thisRow.get(3)).get();
+ inputTag = (int)((ByteWritable)thisRow.get(2)).get();
+ forward(thisRow.subList(0, 3), inputObjInspectors[inputTag]);
+ }
+
+ @Override
+ public void forward(Object row, ObjectInspector rowInspector)
+ throws HiveException {
+ if ((++outputRows % 1000) == 0) {
+ if (counterNameToEnum != null) {
+ incrCounter(numOutputRowsCntr, outputRows);
+ outputRows = 0;
+ }
+ }
+
+ if (childOperatorsArray == null && childOperators != null) {
+ throw new HiveException(
+ "Internal Hive error during operator initialization.");
+ }
+
+ if ((childOperatorsArray == null) || (getDone())) {
+ return;
+ }
+
+ int childrenDone = 0;
+ int forwardFLag = 1;
+ assert childOperatorsArray.length <= 8;
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ Operator extends Serializable> o = childOperatorsArray[i];
+ if (o.getDone()) {
+ childrenDone++;
+ } else {
+ if ((opTags & (forwardFLag << i)) != 0){
+ for(int j = 0; j> childIndx2DispatchHandlers:
+ dispatchHandlers.values()) {
+ for (ArrayList dispatchHandlers: childIndx2DispatchHandlers.values()) {
+ for (DispatchHandler dispatchHandler: dispatchHandlers) {
+ dispatchHandler.printCloseOpLog();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void setGroupKeyObject(Object keyObject) {
+ this.groupKeyObject = keyObject;
+ for (Operator extends Serializable> op : childOperators) {
+ op.setGroupKeyObject(keyObject);
+ }
+ }
+
+ @Override
+ public OperatorType getType() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return new String("CDP");
+ }
+
+}
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1224666)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -446,6 +446,7 @@
HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join
HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join
HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true),
+ HIVEOPTCORRELATION("hive.optimize.correlation", false), // exploit intra-query correlations
// Indexes
HIVEOPTINDEXFILTER_COMPACT_MINSIZE("hive.optimize.index.filter.compact.minsize", (long) 5 * 1024 * 1024 * 1024), // 5G
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (revision 1224666)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (working copy)
@@ -34,8 +34,8 @@
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext;
import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
-import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
@@ -580,7 +580,7 @@
}
@Override
- protected boolean allInitializedParentsAreClosed() {
+ public boolean allInitializedParentsAreClosed() {
return true;
}
Index: ql/src/test/results/compiler/plan/groupby2.q.xml
===================================================================
--- ql/src/test/results/compiler/plan/groupby2.q.xml (revision 1224666)
+++ ql/src/test/results/compiler/plan/groupby2.q.xml (working copy)
@@ -177,6 +177,56 @@
+
+ VALUE._col1
+
+
+ _col3
+
+
+
+
+
+
+
+ double
+
+
+
+
+
+
+ KEY._col1:0._col0
+
+
+ _col1
+
+
+
+
+
+
+
+
+
+
+ VALUE._col0
+
+
+ _col2
+
+
+
+
+
+
+
+ bigint
+
+
+
+
+
@@ -198,17 +248,7 @@
-
-
- _col1
-
-
-
-
-
-
-
-
+
@@ -280,38 +320,10 @@
-
-
- _col2
-
-
-
-
-
-
-
- bigint
-
-
-
-
+
-
-
- _col3
-
-
-
-
-
-
-
- double
-
-
-
-
+
@@ -402,7 +414,7 @@
VALUE._col0
-
+
@@ -412,7 +424,7 @@
VALUE._col1
-
+
@@ -781,7 +793,7 @@
-
+
@@ -794,7 +806,7 @@
-
+
@@ -821,7 +833,7 @@
src
-
+
@@ -1044,7 +1056,7 @@
src
-
+
@@ -1343,7 +1355,7 @@
-
+
@@ -1397,7 +1409,7 @@
-
+
@@ -1423,7 +1435,7 @@
_col1
-
+ _col1
@@ -1431,13 +1443,13 @@
-
+ _col0
-
+ _col0
@@ -1456,10 +1468,10 @@
-
+
-
+
@@ -1533,7 +1545,7 @@
_col1
-
+
@@ -1562,7 +1574,7 @@
_col0
-
+ KEY._col0
@@ -1630,7 +1642,7 @@
VALUE._col1
-
+
@@ -1646,7 +1658,7 @@
-
+
@@ -1725,7 +1737,7 @@
-
+
@@ -1738,7 +1750,7 @@
-
+
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (revision 1224666)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (working copy)
@@ -23,6 +23,9 @@
import java.util.List;
import org.apache.hadoop.hive.ql.plan.CollectDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationLocalSimulativeReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc;
import org.apache.hadoop.hive.ql.plan.ExtractDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc;
@@ -91,6 +94,12 @@
HashTableDummyOperator.class));
opvec.add(new OpTuple(HashTableSinkDesc.class,
HashTableSinkOperator.class));
+ opvec.add(new OpTuple(CorrelationCompositeDesc.class,
+ CorrelationCompositeOperator.class));
+ opvec.add(new OpTuple(CorrelationReducerDispatchDesc.class,
+ CorrelationReducerDispatchOperator.class));
+ opvec.add(new OpTuple(CorrelationLocalSimulativeReduceSinkDesc.class,
+ CorrelationLocalSimulativeReduceSinkOperator.class));
}
public static Operator get(Class opClass) {
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java (revision 0)
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.CorrelationLocalSimulativeReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Correlation Local Simulative Reduce Sink Operator sends output to another operator (e.g. JOIN or GBY).
+ * CorrelationLocalSimulativeReduceSinkOperator is used only in reduce phase.
+ * Basically, it is a bridge from one JOIN or GBY operator to another JOIN or GBY operator.
+ * A CorrelationLocalSimulativeReduceSinkOperator will take care actions of startGroup and endGroup of its
+ * succeeding JOIN or GBY operator.
+ **/
+public class CorrelationLocalSimulativeReduceSinkOperator
+ extends BaseReduceSinkOperator
+ implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ transient TableDesc keyTableDesc;
+ transient TableDesc valueTableDesc;
+
+ transient Deserializer inputKeyDeserializer;
+
+ transient SerDe inputValueDeserializer;
+
+ transient ByteWritable tagWritable;
+
+ transient ObjectInspector outputKeyObjectInspector;
+ transient ObjectInspector outputValueObjectInspector;
+ transient ObjectInspector[] outputPartitionObjectInspectors;
+
+ private ArrayList forwardedRow;
+ private Object keyObject;
+ private Object valueObject;
+
+ private static String[] fieldNames;
+
+ static {
+ ArrayList fieldNameArray = new ArrayList();
+ for (Utilities.ReduceField r : Utilities.ReduceField.values()) {
+ fieldNameArray.add(r.toString());
+ }
+ fieldNames = fieldNameArray.toArray(new String[0]);
+ }
+
+ public CorrelationLocalSimulativeReduceSinkOperator(){
+
+ }
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ forwardedRow = new ArrayList(3);
+ tagByte = new byte[1];
+ tagWritable = new ByteWritable();
+ tempInspectableObject = new InspectableObject();
+ keyWritable = new HiveKey();
+ assert childOperatorsArray.length == 1;
+ try {
+ keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()];
+ int i = 0;
+ for (ExprNodeDesc e : conf.getKeyCols()) {
+ keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+
+ numDistributionKeys = conf.getNumDistributionKeys();
+ distinctColIndices = conf.getDistinctColumnIndices();
+ numDistinctExprs = distinctColIndices.size();
+
+ valueEval = new ExprNodeEvaluator[conf.getValueCols().size()];
+ i = 0;
+ for (ExprNodeDesc e : conf.getValueCols()) {
+ valueEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+
+ tag = conf.getTag();
+ tagByte[0] = (byte) tag;
+ tagWritable.set(tagByte[0]);
+ LOG.info("Using tag = " + tag);
+
+ TableDesc keyTableDesc = conf.getKeySerializeInfo();
+ keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
+ .newInstance();
+ keySerializer.initialize(null, keyTableDesc.getProperties());
+ keyIsText = keySerializer.getSerializedClass().equals(Text.class);
+
+ inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc
+ .getDeserializerClass(), null);
+ inputKeyDeserializer.initialize(null, keyTableDesc.getProperties());
+ outputKeyObjectInspector = inputKeyDeserializer.getObjectInspector();
+
+ TableDesc valueTableDesc = conf.getValueSerializeInfo();
+ valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+ .newInstance();
+ valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+ inputValueDeserializer = (SerDe) ReflectionUtils.newInstance(
+ valueTableDesc.getDeserializerClass(), null);
+ inputValueDeserializer.initialize(null, valueTableDesc
+ .getProperties());
+ outputValueObjectInspector = inputValueDeserializer.getObjectInspector();
+
+ ObjectInspector rowInspector = inputObjInspectors[0];
+
+ keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval,
+ distinctColIndices,
+ conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector);
+ valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, conf
+ .getOutputValueColumnNames(), rowInspector);
+ int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1;
+ int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 :
+ numDistributionKeys;
+ cachedKeys = new Object[numKeys][keyLen];
+ cachedValues = new Object[valueEval.length];
+ assert cachedKeys.length == 1;
+
+ ArrayList ois = new ArrayList();
+ ois.add(outputKeyObjectInspector);
+ ois.add(outputValueObjectInspector);
+ ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector);
+
+ outputObjInspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois);
+
+ LOG.info("Simulative ReduceSink inputObjInspectors"
+ + ((StructObjectInspector) inputObjInspectors[0]).getTypeName());
+
+ LOG.info("Simulative ReduceSink outputObjInspectors "
+ + this.getChildOperators().get(0).getParentOperators().indexOf(this) +
+ " " + ((StructObjectInspector) outputObjInspector).getTypeName());
+
+ initializeChildren(hconf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ private BytesWritable groupKey;
+
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
+
+ try {
+ // Evaluate the value
+ for (int i = 0; i < valueEval.length; i++) {
+ cachedValues[i] = valueEval[i].evaluate(row);
+ }
+ // Serialize the value
+ value = valueSerializer.serialize(cachedValues, valueObjectInspector);
+ valueObject = inputValueDeserializer.deserialize(value);
+
+ // Evaluate the keys
+ Object[] distributionKeys = new Object[numDistributionKeys];
+ for (int i = 0; i < numDistributionKeys; i++) {
+ distributionKeys[i] = keyEval[i].evaluate(row);
+ }
+
+ if (numDistinctExprs > 0) {
+ // with distinct key(s)
+ for (int i = 0; i < numDistinctExprs; i++) {
+ System.arraycopy(distributionKeys, 0, cachedKeys[i], 0, numDistributionKeys);
+ Object[] distinctParameters =
+ new Object[distinctColIndices.get(i).size()];
+ for (int j = 0; j < distinctParameters.length; j++) {
+ distinctParameters[j] =
+ keyEval[distinctColIndices.get(i).get(j)].evaluate(row);
+ }
+ cachedKeys[i][numDistributionKeys] =
+ new StandardUnion((byte)i, distinctParameters);
+ }
+ } else {
+ // no distinct key
+ System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys);
+ }
+
+
+ for (int i = 0; i < cachedKeys.length; i++) {
+
+ if (keyIsText) {
+ Text key = (Text) keySerializer.serialize(cachedKeys[i],
+ keyObjectInspector);
+ keyWritable.set(key.getBytes(), 0, key.getLength());
+
+ } else {
+ // Must be BytesWritable
+ BytesWritable key = (BytesWritable) keySerializer.serialize(
+ cachedKeys[i], keyObjectInspector);
+ keyWritable.set(key.getBytes(), 0, key.getLength());
+ }
+
+ if (!keyWritable.equals(groupKey)) {
+ try {
+ keyObject = inputKeyDeserializer.deserialize(keyWritable);
+ } catch (Exception e) {
+ throw new HiveException(
+ "Hive Runtime Error: Unable to deserialize reduce input key from "
+ + Utilities.formatBinaryString(keyWritable.get(), 0,
+ keyWritable.getSize()) + " with properties "
+ + keyTableDesc.getProperties(), e);
+ }
+ if (groupKey == null) { // the first group
+ groupKey = new BytesWritable();
+ } else {
+ // if its child has not been ended, end it
+ if(!keyWritable.equals(childOperatorsArray[0].getBytesWritableGroupKey())){
+ childOperatorsArray[0].endGroup();
+ }
+ }
+ groupKey.set(keyWritable.get(), 0, keyWritable.getSize());
+ if(!groupKey.equals(childOperatorsArray[0].getBytesWritableGroupKey())){
+ childOperatorsArray[0].startGroup();
+ childOperatorsArray[0].setGroupKeyObject(keyObject);
+ childOperatorsArray[0].setBytesWritableGroupKey(groupKey);
+ }
+
+ }
+
+ forwardedRow.clear();
+ forwardedRow.add(keyObject);
+ forwardedRow.add(valueObject);
+ forwardedRow.add(tagWritable);
+ forward(forwardedRow, outputObjInspector);
+ }
+ } catch (SerDeException e1) {
+ e1.printStackTrace();
+ }
+
+ }
+
+ @Override
+ public void closeOp(boolean abort) throws HiveException {
+ if(!abort){
+ //if(childOperatorsArray[0].getNumOfClosedParentOperators() == childOperatorsArray[0].getParentOperators().size() - 1){
+ if(childOperatorsArray[0].allInitializedParentsAreClosed()) {
+ childOperatorsArray[0].endGroup();
+ }
+ }
+ }
+
+ @Override
+ public void startGroup() throws HiveException {
+ // do nothing
+ }
+
+ @Override
+ public void endGroup() throws HiveException {
+ // do nothing
+ }
+
+ @Override
+ public void setGroupKeyObject(Object keyObject) {
+ // do nothing
+
+ }
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return new String("CLSReduceSink");
+ }
+
+ @Override
+ public OperatorType getType() {
+ return null;
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java (revision 0)
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+
+
+/**
+ * Correlation composite operator Descriptor implementation.
+ *
+ */
+@Explain(displayName = "Correlation Composite Operator")
+public class CorrelationCompositeDesc implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ private ReduceSinkOperator correspondingReduceSinkOperator;
+
+ public CorrelationCompositeDesc(){
+
+ }
+
+ public CorrelationCompositeDesc(ReduceSinkOperator correspondingReduceSinkOperator){
+ this.correspondingReduceSinkOperator = correspondingReduceSinkOperator;
+ }
+
+ public void setCorrespondingReduceSinkOperator(
+ ReduceSinkOperator correspondingReduceSinkOperator){
+ this.correspondingReduceSinkOperator = correspondingReduceSinkOperator;
+ }
+
+ public ReduceSinkOperator getCorrespondingReduceSinkOperator(){
+ return correspondingReduceSinkOperator;
+ }
+
+ private int[] allOperationPathTags;
+
+ public void setAllOperationPathTags(int[] allOperationPathTags){
+ this.allOperationPathTags = allOperationPathTags;
+ }
+
+ public int[] getAllOperationPathTags(){
+ return allOperationPathTags;
+ }
+
+}
Index: ql/src/test/results/compiler/plan/groupby5.q.xml
===================================================================
--- ql/src/test/results/compiler/plan/groupby5.q.xml (revision 1224666)
+++ ql/src/test/results/compiler/plan/groupby5.q.xml (working copy)
@@ -177,6 +177,24 @@
+
+ VALUE._col0
+
+
+ _col1
+
+
+
+
+
+
+
+ double
+
+
+
+
+
@@ -249,21 +267,7 @@
-
-
- _col1
-
-
-
-
-
-
-
- double
-
-
-
-
+
@@ -360,7 +364,7 @@
_col0
-
+ key
@@ -456,7 +460,7 @@
-
+
@@ -1101,7 +1105,7 @@
_col1
-
+ _col1
@@ -1115,7 +1119,7 @@
_col0
-
+ _col0
@@ -1134,10 +1138,10 @@
-
+
-
+
@@ -1221,7 +1225,7 @@
_col0
-
+ KEY._col0
@@ -1273,7 +1277,7 @@
-
+
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java (revision 0)
@@ -0,0 +1,738 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.CorrelationCompositeOperator;
+import org.apache.hadoop.hive.ql.exec.CorrelationLocalSimulativeReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.CorrelationOptimizer.IntraQueryCorrelation;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
+import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationLocalSimulativeReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.ForwardDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+
+
+public final class CorrelationOptimizerUtils {
+
+ static final private Log LOG = LogFactory.getLog(CorrelationOptimizerUtils.class.getName());
+
+ public static boolean isExisted(ExprNodeDesc expr, ArrayList col_list) {
+ for (ExprNodeDesc thisExpr: col_list) {
+ if (expr.getExprString().equals(thisExpr.getExprString())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static String getColumnName(Map opColumnExprMap, ExprNodeDesc expr) {
+ for (Entry entry: opColumnExprMap.entrySet()) {
+ if (expr.getExprString().equals(entry.getValue().getExprString())) {
+ return entry.getKey();
+ }
+ }
+ return null;
+ }
+
+
+ public static Operator extends Serializable> unionUsedColumnsAndMakeNewSelect(ArrayList rsops,
+ IntraQueryCorrelation correlation, LinkedHashMap,
+ Map> originalOpColumnExprMap, TableScanOperator input, ParseContext pGraphContext,
+ LinkedHashMap, OpParseContext> originalOpParseCtx) {
+
+ ArrayList columnNames = new ArrayList();
+ Map colExprMap = new HashMap();
+ ArrayList col_list = new ArrayList();
+ RowResolver out_rwsch = new RowResolver();
+ boolean isSelectAll = false;
+
+ int pos = 0;
+ for (ReduceSinkOperator rsop: rsops) {
+ Operator extends Serializable> curr = correlation.getBottom2TSops().get(rsop).get(0).getChildOperators().get(0);
+ while(true) {
+ if (curr.getName().equals("SEL")) {
+ SelectOperator selOp = (SelectOperator)curr;
+ if (selOp.getColumnExprMap() != null) {
+ for (Entry entry: selOp.getColumnExprMap().entrySet()) {
+ ExprNodeDesc expr = entry.getValue();
+ if (!isExisted(expr, col_list) && originalOpParseCtx.get(selOp).getRowResolver().getInvRslvMap().containsKey(entry.getKey())) {
+ col_list.add(expr);
+ String[] colRef = originalOpParseCtx.get(selOp).getRowResolver().getInvRslvMap().get(entry.getKey());
+ String tabAlias = colRef[0];
+ String colAlias = colRef[1];
+ String outputName = entry.getKey();
+ out_rwsch.put(tabAlias, colAlias, new ColumnInfo(
+ outputName, expr.getTypeInfo(), tabAlias, false));
+ pos++;
+ columnNames.add(outputName);
+ colExprMap.put(outputName, expr);
+ }
+ }
+ } else {
+ for (ExprNodeDesc expr: selOp.getConf().getColList()) {
+ if (!isExisted(expr, col_list)) {
+ col_list.add(expr);
+ String[] colRef = pGraphContext.getOpParseCtx().get(selOp).getRowResolver().getInvRslvMap().get(expr.getCols().get(0));
+ String tabAlias = colRef[0];
+ String colAlias = colRef[1];
+ String outputName = expr.getCols().get(0);
+ out_rwsch.put(tabAlias, colAlias, new ColumnInfo(
+ outputName, expr.getTypeInfo(), tabAlias, false));
+ columnNames.add(outputName);
+ colExprMap.put(outputName, expr);
+ pos++;
+ }
+ }
+ }
+ break;
+ } else if (curr.getName().equals("FIL")) {
+ isSelectAll = true;
+ break;
+ } else if (curr.getName().equals("RS")) {
+ ReduceSinkOperator thisRSop = (ReduceSinkOperator)curr;
+ for (ExprNodeDesc expr: thisRSop.getConf().getKeyCols()) {
+ if (!isExisted(expr, col_list)) {
+ col_list.add(expr);
+ assert expr.getCols().size() == 1;
+ String columnName = getColumnName(originalOpColumnExprMap.get(thisRSop), expr);
+ String[] colRef = pGraphContext.getOpParseCtx().get(thisRSop).getRowResolver().getInvRslvMap().get(columnName);
+ String tabAlias = colRef[0];
+ String colAlias = colRef[1];
+ String outputName = expr.getCols().get(0);
+ out_rwsch.put(tabAlias, colAlias, new ColumnInfo(
+ outputName, expr.getTypeInfo(), tabAlias, false));
+ columnNames.add(outputName);
+ colExprMap.put(outputName, expr);
+ pos++;
+ }
+ }
+ for (ExprNodeDesc expr: thisRSop.getConf().getValueCols()) {
+ if (!isExisted(expr, col_list)) {
+ col_list.add(expr);
+ assert expr.getCols().size() == 1;
+ String columnName = getColumnName(originalOpColumnExprMap.get(thisRSop), expr);
+ String[] colRef = pGraphContext.getOpParseCtx().get(thisRSop).getRowResolver().getInvRslvMap().get(columnName);
+ String tabAlias = colRef[0];
+ String colAlias = colRef[1];
+ String outputName = expr.getCols().get(0);
+ out_rwsch.put(tabAlias, colAlias, new ColumnInfo(
+ outputName, expr.getTypeInfo(), tabAlias, false));
+ columnNames.add(outputName);
+ colExprMap.put(outputName, expr);
+ pos++;
+ }
+ }
+
+ break;
+ } else {
+ curr = curr.getChildOperators().get(0);
+ }
+ }
+ }
+
+ Operator extends Serializable> output;
+ if (isSelectAll) {
+ output = input;
+ } else {
+ output = putOpInsertMap(OperatorFactory.getAndMakeChild(
+ new SelectDesc(col_list, columnNames, false), new RowSchema(
+ out_rwsch.getColumnInfos()), input), out_rwsch, pGraphContext.getOpParseCtx());
+ output.setColumnExprMap(colExprMap);
+ output.setChildOperators(Utilities.makeList());
+
+ }
+
+ return output;
+ }
+
+
+ public static Operator putOpInsertMap(Operator op,
+ RowResolver rr, LinkedHashMap, OpParseContext> opParseCtx) {
+ OpParseContext ctx = new OpParseContext(rr);
+ opParseCtx.put(op, ctx);
+ op.augmentPlan();
+ return op;
+ }
+
+ public static HashMap, String> getAliasIDtTopOps(HashMap> topOps) {
+ HashMap, String> aliasIDtTopOps = new HashMap, String>();
+ for (Entry> entry: topOps.entrySet()) {
+ assert !aliasIDtTopOps.containsKey(entry.getValue());
+ aliasIDtTopOps.put(entry.getValue(), entry.getKey());
+ }
+ return aliasIDtTopOps;
+ }
+
+ public static ArrayList findPeerReduceSinkOperators(ReduceSinkOperator op) {
+
+ ArrayList peerReduceSinkOperators = new ArrayList();
+
+ List> children = op.getChildOperators();
+ assert children.size() == 1;
+
+ for (Operator extends Serializable> parent: children.get(0).getParentOperators()) {
+ assert (parent instanceof ReduceSinkOperator);
+ peerReduceSinkOperators.add((ReduceSinkOperator)parent);
+ }
+
+ return peerReduceSinkOperators;
+ }
+
+ public static ArrayList findPeerFakeReduceSinkOperators(CorrelationLocalSimulativeReduceSinkOperator op) {
+
+ ArrayList peerReduceSinkOperators = new ArrayList();
+
+ List> children = op.getChildOperators();
+ assert children.size() == 1;
+
+ for (Operator extends Serializable> parent: children.get(0).getParentOperators()) {
+ assert (parent instanceof ReduceSinkOperator);
+ peerReduceSinkOperators.add((CorrelationLocalSimulativeReduceSinkOperator)parent);
+ }
+
+ return peerReduceSinkOperators;
+ }
+
+ // find how many layer's of Fake reduce sink
+ public static int getPostComputationDepth(IntraQueryCorrelation correlation) {
+ int depth = 0;
+ for (ReduceSinkOperator rsop: correlation.getBottomReduceSinkOperators()) {
+ ReduceSinkOperator op = rsop;
+ int layer = 0;
+ while(!correlation.getTopReduceSinkOperators().contains(op)) {
+ assert correlation.getDown2upRSops().get(op).size() == 1;
+ op = correlation.getDown2upRSops().get(op).get(0);
+ layer++;
+ }
+ if (layer > depth) {
+ depth = layer;
+ }
+ }
+ assert depth >= 1;
+ return depth;
+ }
+
+
+ public static int getPostComputationDepthOfThisPlan(ReduceSinkOperator rsop, IntraQueryCorrelation correlation) {
+ int depth = 0;
+ ReduceSinkOperator op = rsop;
+ while(!correlation.getTopReduceSinkOperators().contains(op)) {
+ assert correlation.getDown2upRSops().get(op).size() == 1;
+ op = correlation.getDown2upRSops().get(op).get(0);
+ depth++;
+ }
+ assert depth >= 1;
+ return depth;
+ }
+
+ public static ParseContext applyCorrelation(IntraQueryCorrelation correlation, ParseContext inputpGraphContext,
+ LinkedHashMap, Map> originalOpColumnExprMap,
+ LinkedHashMap, RowResolver> originalOpRowResolver,
+ Map groupbyRegular2MapSide,
+ LinkedHashMap, OpParseContext> originalOpParseCtx) {
+
+ ParseContext pGraphContext = inputpGraphContext;
+
+ // 0: if necessary, replace RS-GBY to GBY-RS-GBY. In GBY-RS-GBY, the first GBY is in type of hash, so it can group records
+ LOG.info("apply correlation step 0: replace RS-GBY to GBY-RS-GBY");
+ for (ReduceSinkOperator rsop: correlation.getRSGBYToBeReplacedByGBYRSGBY()) {
+ LOG.info("operator " + rsop.getIdentifier() + " should be replaced");
+ assert !correlation.getBottomReduceSinkOperators().contains(rsop);
+ GroupByOperator mapSideGBY = groupbyRegular2MapSide.get(rsop);
+ assert (mapSideGBY.getChildOperators().get(0).getChildOperators().get(0) instanceof GroupByOperator);
+ ReduceSinkOperator newRsop = (ReduceSinkOperator)mapSideGBY.getChildOperators().get(0);
+ GroupByOperator reduceSideGBY = (GroupByOperator)newRsop.getChildOperators().get(0);
+ GroupByOperator oldReduceSideGBY = (GroupByOperator)rsop.getChildOperators().get(0);
+ List> parents = rsop.getParentOperators();
+ List> children = oldReduceSideGBY.getChildOperators();
+ mapSideGBY.setParentOperators(parents);
+ for (Operator extends Serializable> parent: parents) {
+ parent.replaceChild(rsop, mapSideGBY);
+ }
+ reduceSideGBY.setChildOperators(children);
+ for (Operator extends Serializable> child: children) {
+ child.replaceParent(oldReduceSideGBY, reduceSideGBY);
+ }
+ correlation.getAllReduceSinkOperators().remove(rsop);
+ correlation.getAllReduceSinkOperators().add(newRsop);
+ }
+
+
+ Operator extends Serializable> curr;
+
+ // 1: Create table scan operator
+ LOG.info("apply correlation step 1: create table scan operator");
+ HashMap oldTSOP2newTSOP = new HashMap();
+ HashMap> oldTopOps = pGraphContext.getTopOps();
+ HashMap, String> oldAliasIDtTopOps = getAliasIDtTopOps(oldTopOps);
+ HashMap oldTopToTable = pGraphContext.getTopToTable();
+ HashMap> addedTopOps = new HashMap>();
+ HashMap addedTopToTable = new HashMap();
+ for (Entry> entry: correlation.getTable2CorrelatedTSops().entrySet()) {
+ TableScanOperator oldTSop = entry.getValue().get(0);
+ TableScanDesc tsDesc = new TableScanDesc(oldTSop.getConf().getAlias(), oldTSop.getConf().getVirtualCols());
+ tsDesc.setForwardRowNumber(true);
+ OpParseContext opParseCtx= pGraphContext.getOpParseCtx().get(oldTSop);
+ Operator extends Serializable> top = putOpInsertMap(OperatorFactory.get(tsDesc,
+ new RowSchema(opParseCtx.getRowResolver().getColumnInfos())),
+ opParseCtx.getRowResolver(), pGraphContext.getOpParseCtx());
+ top.setParentOperators(null);
+ top.setChildOperators(Utilities.makeList());
+ for (TableScanOperator tsop: entry.getValue()) {
+ addedTopOps.put(oldAliasIDtTopOps.get(tsop), top);
+ addedTopToTable.put((TableScanOperator) top, oldTopToTable.get(tsop));
+ oldTSOP2newTSOP.put(tsop, (TableScanOperator)top);
+ }
+ }
+
+ int postComputationDepth = getPostComputationDepth(correlation);
+ ArrayList> childrenOfDispatch = new ArrayList>();
+ for (ReduceSinkOperator rsop: correlation.getBottomReduceSinkOperators()) {
+ int thisPostComputationDepth = getPostComputationDepthOfThisPlan(rsop, correlation);
+ // TODO: currently, correlation optimizer can not handle the case that
+ // a table is directly connected to a post computation operator.
+ assert correlation.getBottomReduceSinkOperators().containsAll(findPeerReduceSinkOperators(rsop));
+ Operator extends Serializable> op = rsop.getChildOperators().get(0);
+ if (!childrenOfDispatch.contains(op)) {
+ LOG.info("Add :" + op.getIdentifier() + " " + op.getName() + " to the children list of dispatch operator");
+ childrenOfDispatch.add(op);
+ }
+ }
+
+ int opTag = 0;
+ HashMap operationPath2CorrelationReduceSinkOps = new HashMap();
+ for (Entry> entry: correlation.getTable2CorrelatedRSops().entrySet()) {
+
+ // 2: Create select operator for shared op plans
+ LOG.info("apply correlation step 2: create select operator for shared operation path for the table of " + entry.getKey());
+ curr = unionUsedColumnsAndMakeNewSelect(entry.getValue(), correlation, originalOpColumnExprMap,
+ oldTSOP2newTSOP.get(correlation.getBottom2TSops().get(entry.getValue().get(0)).get(0)), pGraphContext,
+ originalOpParseCtx);
+
+ // 3: Create CorrelationCompositeOperator, CorrelationReduceSinkOperator
+ LOG.info("apply correlation step 3: create correlation composite Operator and correlation reduce sink operator for the table of " + entry.getKey());
+ curr = createCorrelationCompositeReducesinkOperaotr(
+ correlation.getTable2CorrelatedTSops().get(entry.getKey()), entry.getValue(), correlation, curr, pGraphContext,
+ childrenOfDispatch, entry.getKey(), originalOpColumnExprMap, opTag, originalOpRowResolver);
+
+ operationPath2CorrelationReduceSinkOps.put(new Integer(opTag), (ReduceSinkOperator)curr);
+ opTag++;
+ }
+
+
+ // 4: Create correlation dispatch operator for operation paths
+ LOG.info("apply correlation step 4: create correlation dispatch operator for operation paths");
+ RowResolver outputRS = new RowResolver();
+ List> correlationReduceSinkOps = new ArrayList>();
+ for (Entry entry: operationPath2CorrelationReduceSinkOps.entrySet()) {
+ Integer opTagInteger = entry.getKey();
+ curr = entry.getValue();
+ correlationReduceSinkOps.add((ReduceSinkOperator)curr);
+ RowResolver inputRS = pGraphContext.getOpParseCtx().get(curr).getRowResolver();
+ for (Entry> e1: inputRS.getRslvMap().entrySet()) {
+ for (Entry e2: e1.getValue().entrySet()) {
+ outputRS.put(e1.getKey(), e2.getKey(), e2.getValue());
+ }
+ }
+ }
+
+ Operator extends Serializable> dispatchOp = putOpInsertMap(OperatorFactory.get(
+ new CorrelationReducerDispatchDesc(correlation.getDispatchConf(), correlation.getDispatchKeySelectDescConf(), correlation.getDispatchValueSelectDescConf()),
+ new RowSchema(outputRS.getColumnInfos())),
+ outputRS, pGraphContext.getOpParseCtx());
+
+ dispatchOp.setParentOperators(correlationReduceSinkOps);
+ for (Operator extends Serializable> thisOp: correlationReduceSinkOps) {
+ thisOp.setChildOperators(Utilities.makeList(dispatchOp));
+ }
+
+ // 5: Replace the old plan in the original plan tree with new plan
+ LOG.info("apply correlation step 5: Replace the old plan in the original plan tree with the new plan");
+ HashSet> processed = new HashSet>();
+ for (Operator extends Serializable> op: childrenOfDispatch) {
+ ArrayList> parents = new ArrayList>();
+ for (Operator extends Serializable> oldParent: op.getParentOperators()) {
+ if (!correlation.getBottomReduceSinkOperators().contains(oldParent)) {
+ parents.add(oldParent);
+ }
+ }
+ parents.add(dispatchOp);
+ op.setParentOperators(parents);
+ }
+ dispatchOp.setChildOperators(childrenOfDispatch);
+ HashMap> newTopOps = new HashMap>();
+ for (Entry> entry: oldTopOps.entrySet()) {
+ if (addedTopOps.containsKey(entry.getKey())) {
+ newTopOps.put(entry.getKey(), addedTopOps.get(entry.getKey()));
+ } else {
+ newTopOps.put(entry.getKey(), entry.getValue());
+ }
+ }
+ pGraphContext.setTopOps(newTopOps);
+ HashMap newTopToTable = new HashMap();
+ for (Entry entry: oldTopToTable.entrySet()) {
+ if (addedTopToTable.containsKey(oldTSOP2newTSOP.get(entry.getKey()))) {
+ newTopToTable.put(oldTSOP2newTSOP.get(entry.getKey()),
+ addedTopToTable.get(oldTSOP2newTSOP.get(entry.getKey())));
+ } else {
+ newTopToTable.put(entry.getKey(), entry.getValue());
+ }
+ }
+ pGraphContext.setTopToTable(newTopToTable);
+
+ // 6: Change each JFC related ReduceSinkOperator to a CorrelationFakeReduceSinkOperator
+ LOG.info("apply correlation step 6: Change each JFC related reduce sink operator to a correlation fake reduce sink operator");
+ HashMap, ArrayList>> newParentsOfChildren =
+ new HashMap, ArrayList>>();
+ for (ReduceSinkOperator rsop: correlation.getAllReduceSinkOperators()) {
+ if (!correlation.getBottomReduceSinkOperators().contains(rsop)) {
+ Operator extends Serializable> childOP = rsop.getChildOperators().get(0);
+ Operator extends Serializable> parentOP = rsop.getParentOperators().get(0);
+ Operator extends Serializable> correlationFakeReduceSinkOperator = putOpInsertMap(OperatorFactory.get(
+ new CorrelationLocalSimulativeReduceSinkDesc(rsop.getConf()),
+ new RowSchema(pGraphContext.getOpParseCtx().get(rsop).getRowResolver().getColumnInfos())),
+ pGraphContext.getOpParseCtx().get(rsop).getRowResolver(), pGraphContext.getOpParseCtx());
+ correlationFakeReduceSinkOperator.setChildOperators(Utilities.makeList(childOP));
+ correlationFakeReduceSinkOperator.setParentOperators(Utilities.makeList(parentOP));
+ parentOP.getChildOperators().set(parentOP.getChildOperators().indexOf(rsop), correlationFakeReduceSinkOperator);
+ childOP.getParentOperators().set(childOP.getParentOperators().indexOf(rsop), correlationFakeReduceSinkOperator);
+ }
+ }
+
+ return pGraphContext;
+ }
+
+ public static Operator extends Serializable> createCorrelationCompositeReducesinkOperaotr(
+ ArrayList tsops, ArrayList rsops,
+ IntraQueryCorrelation correlation,
+ Operator extends Serializable> input, ParseContext pGraphContext,
+ ArrayList> childrenOfDispatch, String tableName,
+ LinkedHashMap, Map> originalOpColumnExprMap, int newTag,
+ LinkedHashMap, RowResolver> originalOpRowResolver) {
+
+ // Create CorrelationCompositeOperator
+ RowResolver inputRR = pGraphContext.getOpParseCtx().get(input).getRowResolver();
+ ArrayList> tops = new ArrayList>();
+ ArrayList> bottoms = new ArrayList>();
+ ArrayList opTags = new ArrayList();
+
+ for (ReduceSinkOperator rsop: rsops) {
+ TableScanOperator tsop = correlation.getBottom2TSops().get(rsop).get(0);
+ Operator extends Serializable> curr = tsop.getChildOperators().get(0);
+ if (curr == rsop) {
+ // no filter needed, just forward
+ ForwardDesc forwardCtx = new ForwardDesc();
+ Operator forwardOp = OperatorFactory.get(ForwardDesc.class);
+ forwardOp.setConf(forwardCtx);
+ tops.add(forwardOp);
+ bottoms.add(forwardOp);
+ opTags.add(correlation.getBottomReduceSink2OperationPathMap().get(rsop));
+ } else {
+ // Add filter operator
+ FilterOperator currFilOp = null;
+ while(curr != rsop) {
+ if (curr.getName().equals("FIL")) {
+ FilterOperator fil = (FilterOperator)curr;
+ FilterDesc filterCtx = new FilterDesc(fil.getConf().getPredicate(), false);
+ Operator nowFilOp = OperatorFactory.get(FilterDesc.class);
+ nowFilOp.setConf(filterCtx);
+ if (currFilOp == null) {
+ currFilOp = (FilterOperator)nowFilOp;
+ tops.add(currFilOp);
+ } else {
+ nowFilOp.setParentOperators(Utilities.makeList(currFilOp));
+ currFilOp.setChildOperators(Utilities.makeList(nowFilOp));
+ currFilOp = (FilterOperator) nowFilOp;
+ }
+ }
+ curr = curr.getChildOperators().get(0);
+ }
+ if (currFilOp == null) {
+ ForwardDesc forwardCtx = new ForwardDesc();
+ Operator forwardOp = OperatorFactory.get(ForwardDesc.class);
+ forwardOp.setConf(forwardCtx);
+ tops.add(forwardOp);
+ bottoms.add(forwardOp);
+ } else {
+ bottoms.add(currFilOp);
+ }
+ opTags.add(correlation.getBottomReduceSink2OperationPathMap().get(rsop));
+
+ }
+ }
+
+ int[] opTagsArray = new int[opTags.size()];
+ for (int i=0; i op : bottoms) {
+ op.setParentOperators(Utilities.makeList(input));
+ }
+ input.setChildOperators(bottoms);
+
+ CorrelationCompositeDesc ycoCtx = new CorrelationCompositeDesc();
+ ycoCtx.setAllOperationPathTags(opTagsArray);
+
+ Operator extends Serializable> ycop = putOpInsertMap(OperatorFactory.get(ycoCtx,
+ new RowSchema(inputRR.getColumnInfos())),
+ inputRR, pGraphContext.getOpParseCtx());
+ ycop.setParentOperators(tops);
+ for (Operator extends Serializable> op : tops) {
+ op.setChildOperators(Utilities.makeList(ycop));
+ }
+
+ // Create CorrelationReduceSinkOperator
+ ArrayList partitionCols = new ArrayList();
+ ArrayList keyCols = new ArrayList();
+ Map colExprMap = new HashMap();
+ ArrayList keyOutputColumnNames = new ArrayList();
+ ReduceSinkOperator firstRsop = rsops.get(0);
+
+ RowResolver firstRsopRS = pGraphContext.getOpParseCtx().get(firstRsop).getRowResolver();
+ RowResolver orginalFirstRsopRS = originalOpRowResolver.get(firstRsop);
+ RowResolver outputRS = new RowResolver();
+ HashMap keyCol2ExprForDispatch = new HashMap();
+ HashMap valueCol2ExprForDispatch = new HashMap();
+
+ for (ExprNodeDesc expr: firstRsop.getConf().getKeyCols()) {
+ assert expr instanceof ExprNodeColumnDesc;
+ ExprNodeColumnDesc encd = (ExprNodeColumnDesc)expr;
+ String ouputName = getColumnName(originalOpColumnExprMap.get(firstRsop), expr);
+ ColumnInfo cinfo = orginalFirstRsopRS.getColumnInfos().get(orginalFirstRsopRS.getPosition(ouputName));
+
+ String col = SemanticAnalyzer.getColumnInternalName(keyCols.size());
+ keyOutputColumnNames.add(col);
+ ColumnInfo newColInfo = new ColumnInfo(col, cinfo.getType(), tableName, cinfo
+ .getIsVirtualCol(), cinfo.isHiddenVirtualCol());
+
+ colExprMap.put(newColInfo.getInternalName(), expr);
+
+ outputRS.put(tableName, newColInfo.getInternalName(), newColInfo);
+ keyCols.add(expr);
+
+ keyCol2ExprForDispatch.put(encd.getColumn(), new ExprNodeColumnDesc(cinfo.getType(), col, tableName,
+ encd.getIsPartitionColOrVirtualCol()));
+
+ }
+
+ ArrayList valueCols = new ArrayList();
+ ArrayList valueOutputColumnNames = new ArrayList();
+
+ correlation.addOperationPathToDispatchConf(newTag);
+ correlation.addOperationPathToDispatchKeySelectDescConf(newTag);
+ correlation.addOperationPathToDispatchValueSelectDescConf(newTag);
+
+
+ for (ReduceSinkOperator rsop: rsops) {
+ RowResolver rs = pGraphContext.getOpParseCtx().get(rsop).getRowResolver();
+ RowResolver orginalRS = originalOpRowResolver.get(rsop);
+ Integer childOpIndex = childrenOfDispatch.indexOf(rsop.getChildOperators().get(0));
+ int outputTag = rsop.getConf().getTag();
+ if (outputTag == -1) {
+ outputTag = 0;
+ }
+ if (!correlation.getDispatchConfForOperationPath(newTag).containsKey(childOpIndex)) {
+ correlation.getDispatchConfForOperationPath(newTag).put(childOpIndex, new ArrayList());
+ }
+ correlation.getDispatchConfForOperationPath(newTag).get(childOpIndex).add(outputTag);
+
+ ArrayList thisKeyColsInDispatch = new ArrayList();
+ ArrayList outputKeyNamesInDispatch = new ArrayList();
+ for (ExprNodeDesc expr: rsop.getConf().getKeyCols()) {
+ assert expr instanceof ExprNodeColumnDesc;
+ ExprNodeColumnDesc encd = (ExprNodeColumnDesc)expr;
+ String outputName = getColumnName(originalOpColumnExprMap.get(rsop), expr);
+ thisKeyColsInDispatch.add(keyCol2ExprForDispatch.get(encd.getColumn()));
+ String[] names = outputName.split("\\.");
+ outputKeyNamesInDispatch.add(names[1]);
+ }
+
+ if (!correlation.getDispatchKeySelectDescConfForOperationPath(newTag).containsKey(childOpIndex)) {
+ correlation.getDispatchKeySelectDescConfForOperationPath(newTag).put(childOpIndex, new ArrayList());
+ }
+ correlation.getDispatchKeySelectDescConfForOperationPath(newTag).get(childOpIndex).
+ add(new SelectDesc(thisKeyColsInDispatch, outputKeyNamesInDispatch, false));
+
+ ArrayList thisValueColsInDispatch = new ArrayList();
+ ArrayList outputValueNamesInDispatch = new ArrayList();
+ for (ExprNodeDesc expr: rsop.getConf().getValueCols()) {
+
+ String outputName = getColumnName(originalOpColumnExprMap.get(rsop), expr);
+ ColumnInfo cinfo = orginalRS.getColumnInfos().get(orginalRS.getPosition(outputName));
+ if (!valueCol2ExprForDispatch.containsKey(expr.getExprString())) {
+
+ String col = SemanticAnalyzer.getColumnInternalName(keyCols.size() + valueCols.size());
+ valueOutputColumnNames.add(col);
+ ColumnInfo newColInfo = new ColumnInfo(col, cinfo.getType(), tableName, cinfo
+ .getIsVirtualCol(), cinfo.isHiddenVirtualCol());
+ colExprMap.put(newColInfo.getInternalName(), expr);
+ outputRS.put(tableName, newColInfo.getInternalName(), newColInfo);
+ valueCols.add(expr);
+
+ valueCol2ExprForDispatch.put(expr.getExprString(), new ExprNodeColumnDesc(cinfo.getType(), col, tableName,
+ false));
+ }
+
+ thisValueColsInDispatch.add(valueCol2ExprForDispatch.get(expr.getExprString()));
+ String[] names = outputName.split("\\.");
+ outputValueNamesInDispatch.add(names[1]);
+ }
+
+ if (!correlation.getDispatchValueSelectDescConfForOperationPath(newTag).containsKey(childOpIndex)) {
+ correlation.getDispatchValueSelectDescConfForOperationPath(newTag).put(childOpIndex, new ArrayList());
+ }
+ correlation.getDispatchValueSelectDescConfForOperationPath(newTag).get(childOpIndex).
+ add(new SelectDesc(thisValueColsInDispatch, outputValueNamesInDispatch, false));
+ }
+
+ ReduceSinkOperator rsOp = null;
+ try {
+ rsOp = (ReduceSinkOperator) putOpInsertMap(
+ OperatorFactory.getAndMakeChild(getReduceSinkDesc(keyCols,
+ keyCols.size(), valueCols, new ArrayList>(),
+ keyOutputColumnNames, valueOutputColumnNames, true, newTag, keyCols.size(),
+ -1), new RowSchema(outputRS
+ .getColumnInfos()), ycop), outputRS, pGraphContext.getOpParseCtx());
+ rsOp.setColumnExprMap(colExprMap);
+ ((CorrelationCompositeOperator)ycop).getConf().setCorrespondingReduceSinkOperator(rsOp);
+ } catch (SemanticException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ return rsOp;
+ }
+
+
+ /**
+ * Create the correlation reduce sink descriptor.
+ *
+ * @param keyCols
+ * The columns to be stored in the key
+ * @param numKeys number of distribution keys. Equals to group-by-key
+ * numbers usually.
+ * @param valueCols
+ * The columns to be stored in the value
+ * @param distinctColIndices
+ * column indices for distinct aggregates
+ * @param outputKeyColumnNames
+ * The output key columns names
+ * @param outputValueColumnNames
+ * The output value columns names
+ * @param tag
+ * The tag for this reducesink
+ * @param numPartitionFields
+ * The first numPartitionFields of keyCols will be partition columns.
+ * If numPartitionFields=-1, then partition randomly.
+ * @param numReducers
+ * The number of reducers, set to -1 for automatic inference based on
+ * input data size.
+ * @return The YSmartReduceSinkDesc object.
+ */
+ public static ReduceSinkDesc getReduceSinkDesc(
+ ArrayList keyCols, int numKeys,
+ ArrayList valueCols,
+ List> distinctColIndices,
+ List outputKeyColumnNames, List outputValueColumnNames,
+ boolean includeKey, int tag,
+ int numPartitionFields, int numReducers) throws SemanticException {
+ ArrayList partitionCols = null;
+
+ if (numPartitionFields >= keyCols.size()) {
+ partitionCols = keyCols;
+ } else if (numPartitionFields >= 0) {
+ partitionCols = new ArrayList(numPartitionFields);
+ for (int i = 0; i < numPartitionFields; i++) {
+ partitionCols.add(keyCols.get(i));
+ }
+ } else {
+ // numPartitionFields = -1 means random partitioning
+ partitionCols = new ArrayList(1);
+ partitionCols.add(TypeCheckProcFactory.DefaultExprProcessor
+ .getFuncExprNodeDesc("rand"));
+ }
+
+ StringBuilder order = new StringBuilder();
+ for (int i = 0; i < keyCols.size(); i++) {
+ order.append("+");
+ }
+
+ TableDesc keyTable = null;
+ TableDesc valueTable = null;
+ ArrayList outputKeyCols = new ArrayList();
+ ArrayList outputValCols = new ArrayList();
+ if (includeKey) {
+ keyTable = PlanUtils.getReduceKeyTableDesc(PlanUtils.getFieldSchemasFromColumnListWithLength(
+ keyCols, distinctColIndices, outputKeyColumnNames, numKeys, ""),
+ order.toString());
+ outputKeyCols.addAll(outputKeyColumnNames);
+ } else {
+ keyTable = PlanUtils.getReduceKeyTableDesc(PlanUtils.getFieldSchemasFromColumnList(
+ keyCols, "reducesinkkey"),order.toString());
+ for (int i = 0; i < keyCols.size(); i++) {
+ outputKeyCols.add("reducesinkkey" + i);
+ }
+ }
+ valueTable = PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(
+ valueCols, outputValueColumnNames, 0, ""));
+ outputValCols.addAll(outputValueColumnNames);
+
+ return new ReduceSinkDesc(keyCols, numKeys, valueCols, outputKeyCols,
+ distinctColIndices, outputValCols,
+ tag, partitionCols, numReducers, keyTable,
+ valueTable, true);
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (revision 1224666)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (working copy)
@@ -49,6 +49,16 @@
*/
private boolean gatherStats;
+ private boolean forwardRowNumber = false;
+
+ public boolean isForwardRowNumber() {
+ return forwardRowNumber;
+ }
+
+ public void setForwardRowNumber(boolean forwardRowNumber) {
+ this.forwardRowNumber = forwardRowNumber;
+ }
+
private ExprNodeDesc filterExpr;
public static final String FILTER_EXPR_CONF_STR =