### Eclipse Workspace Patch 1.0 #P hive-2206 Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1237326) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -450,6 +450,7 @@ HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true), + HIVEOPTCORRELATION("hive.optimize.correlation", false), // exploit intra-query correlations // Indexes HIVEOPTINDEXFILTER_COMPACT_MINSIZE("hive.optimize.index.filter.compact.minsize", (long) 5 * 1024 * 1024 * 1024), // 5G Index: ql/src/test/results/compiler/plan/groupby5.q.xml =================================================================== --- ql/src/test/results/compiler/plan/groupby5.q.xml (revision 1237326) +++ ql/src/test/results/compiler/plan/groupby5.q.xml (working copy) @@ -177,6 +177,24 @@ + + VALUE._col0 + + + _col1 + + + + + + + + double + + + + + @@ -249,21 +267,7 @@ - - - _col1 - - - - - - - - double - - - - + @@ -360,7 +364,7 @@ _col0 - + key @@ -456,7 +460,7 @@ - + @@ -1101,7 +1105,7 @@ _col1 - + _col1 @@ -1115,7 +1119,7 @@ _col0 - + _col0 @@ -1134,10 +1138,10 @@ - + - + @@ -1221,7 +1225,7 @@ _col0 - + KEY._col0 @@ -1273,7 +1277,7 @@ - + Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (revision 1237326) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (working copy) @@ -21,171 +21,45 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import java.util.Random; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; -import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.Serializer; -import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; /** * Reduce Sink Operator sends output to the reduce stage. **/ -public class ReduceSinkOperator extends TerminalOperator +public class ReduceSinkOperator extends BaseReduceSinkOperator implements Serializable { private static final long serialVersionUID = 1L; - /** - * The evaluators for the key columns. Key columns decide the sort order on - * the reducer side. Key columns are passed to the reducer in the "key". - */ - protected transient ExprNodeEvaluator[] keyEval; - /** - * The evaluators for the value columns. Value columns are passed to reducer - * in the "value". - */ - protected transient ExprNodeEvaluator[] valueEval; - /** - * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in - * Hive language). Partition columns decide the reducer that the current row - * goes to. Partition columns are not passed to reducer. - */ - protected transient ExprNodeEvaluator[] partitionEval; + private final ArrayList operationPathTags = new ArrayList(); // operation path tags + private final byte[] operationPathTagsByte = new byte[1]; - // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is - // ready - transient Serializer keySerializer; - transient boolean keyIsText; - transient Serializer valueSerializer; - transient int tag; - transient byte[] tagByte = new byte[1]; - transient protected int numDistributionKeys; - transient protected int numDistinctExprs; - - @Override - protected void initializeOp(Configuration hconf) throws HiveException { - - try { - keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()]; - int i = 0; - for (ExprNodeDesc e : conf.getKeyCols()) { - keyEval[i++] = ExprNodeEvaluatorFactory.get(e); - } - - numDistributionKeys = conf.getNumDistributionKeys(); - distinctColIndices = conf.getDistinctColumnIndices(); - numDistinctExprs = distinctColIndices.size(); - - valueEval = new ExprNodeEvaluator[conf.getValueCols().size()]; - i = 0; - for (ExprNodeDesc e : conf.getValueCols()) { - valueEval[i++] = ExprNodeEvaluatorFactory.get(e); - } - - partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()]; - i = 0; - for (ExprNodeDesc e : conf.getPartitionCols()) { - partitionEval[i++] = ExprNodeEvaluatorFactory.get(e); - } - - tag = conf.getTag(); - tagByte[0] = (byte) tag; - LOG.info("Using tag = " + tag); - - TableDesc keyTableDesc = conf.getKeySerializeInfo(); - keySerializer = (Serializer) keyTableDesc.getDeserializerClass() - .newInstance(); - keySerializer.initialize(null, keyTableDesc.getProperties()); - keyIsText = keySerializer.getSerializedClass().equals(Text.class); - - TableDesc valueTableDesc = conf.getValueSerializeInfo(); - valueSerializer = (Serializer) valueTableDesc.getDeserializerClass() - .newInstance(); - valueSerializer.initialize(null, valueTableDesc.getProperties()); - - firstRow = true; - initializeChildren(hconf); - } catch (Exception e) { - e.printStackTrace(); - throw new RuntimeException(e); + public void setOperationPathTags(ArrayList operationPathTags) { + this.operationPathTags.addAll(operationPathTags); + int operationPathTagsInt = 0; + int tmp = 1; + for (Integer operationPathTag: operationPathTags) { + operationPathTagsInt += tmp << operationPathTag.intValue(); } + operationPathTagsByte[0] = (byte) operationPathTagsInt; } - transient InspectableObject tempInspectableObject = new InspectableObject(); - transient HiveKey keyWritable = new HiveKey(); - transient Writable value; + public ArrayList getOperationPathTags() { + return this.operationPathTags; + } - transient StructObjectInspector keyObjectInspector; - transient StructObjectInspector valueObjectInspector; - transient ObjectInspector[] partitionObjectInspectors; + public ReduceSinkOperator(){ - transient Object[][] cachedKeys; - transient Object[] cachedValues; - transient List> distinctColIndices; - - boolean firstRow; - - transient Random random; - - /** - * Initializes array of ExprNodeEvaluator. Adds Union field for distinct - * column indices for group by. - * Puts the return values into a StructObjectInspector with output column - * names. - * - * If distinctColIndices is empty, the object inspector is same as - * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)} - */ - protected static StructObjectInspector initEvaluatorsAndReturnStruct( - ExprNodeEvaluator[] evals, List> distinctColIndices, - List outputColNames, - int length, ObjectInspector rowInspector) - throws HiveException { - int inspectorLen = evals.length > length ? length + 1 : evals.length; - List sois = new ArrayList(inspectorLen); - - // keys - ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, 0, length, rowInspector); - sois.addAll(Arrays.asList(fieldObjectInspectors)); - - if (evals.length > length) { - // union keys - List uois = new ArrayList(); - for (List distinctCols : distinctColIndices) { - List names = new ArrayList(); - List eois = new ArrayList(); - int numExprs = 0; - for (int i : distinctCols) { - names.add(HiveConf.getColumnInternalName(numExprs)); - eois.add(evals[i].initialize(rowInspector)); - numExprs++; - } - uois.add(ObjectInspectorFactory.getStandardStructObjectInspector(names, eois)); - } - UnionObjectInspector uoi = - ObjectInspectorFactory.getStandardUnionObjectInspector(uois); - sois.add(uoi); - } - return ObjectInspectorFactory.getStandardStructObjectInspector(outputColNames, sois ); } @Override @@ -267,9 +141,18 @@ keyWritable.set(key.getBytes(), 0, key.getLength()); } else { int keyLength = key.getLength(); - keyWritable.setSize(keyLength + 1); + if (!this.getConf().getNeedsOperationPathTagging()) { + keyWritable.setSize(keyLength + 1); + } else { + keyWritable.setSize(keyLength + 2); + } System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength); - keyWritable.get()[keyLength] = tagByte[0]; + if (!this.getConf().getNeedsOperationPathTagging()) { + keyWritable.get()[keyLength] = tagByte[0]; + } else { + keyWritable.get()[keyLength] = operationPathTagsByte[0]; + keyWritable.get()[keyLength + 1] = tagByte[0]; + } } } else { // Must be BytesWritable @@ -279,9 +162,18 @@ keyWritable.set(key.getBytes(), 0, key.getLength()); } else { int keyLength = key.getLength(); - keyWritable.setSize(keyLength + 1); - System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength); - keyWritable.get()[keyLength] = tagByte[0]; + if (!this.getConf().getNeedsOperationPathTagging()) { + keyWritable.setSize(keyLength + 1); + } else { + keyWritable.setSize(keyLength + 2); + } + System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength); + if (!this.getConf().getNeedsOperationPathTagging()) { + keyWritable.get()[keyLength] = tagByte[0]; + } else { + keyWritable.get()[keyLength] = operationPathTagsByte[0]; + keyWritable.get()[keyLength + 1] = tagByte[0]; + } } } keyWritable.setHashCode(keyHashCode); Index: ql/src/test/results/compiler/plan/groupby1.q.xml =================================================================== --- ql/src/test/results/compiler/plan/groupby1.q.xml (revision 1237326) +++ ql/src/test/results/compiler/plan/groupby1.q.xml (working copy) @@ -311,6 +311,24 @@ + + VALUE._col0 + + + _col1 + + + + + + + + double + + + + + @@ -383,21 +401,7 @@ - - - _col1 - - - - - - - - double - - - - + @@ -494,7 +498,7 @@ _col0 - + key @@ -590,7 +594,7 @@ - + @@ -1214,7 +1218,7 @@ _col1 - + _col1 @@ -1228,7 +1232,7 @@ _col0 - + _col0 @@ -1247,10 +1251,10 @@ - + - + @@ -1328,7 +1332,7 @@ _col0 - + KEY._col0 @@ -1380,7 +1384,7 @@ - + Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java (revision 0) @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CorrelationLocalSimulativeReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.Serializer; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * Correlation Local Simulative Reduce Sink Operator sends output to another operator (e.g. JOIN or GBY). + * CorrelationLocalSimulativeReduceSinkOperator is used only in reduce phase. + * Basically, it is a bridge from one JOIN or GBY operator to another JOIN or GBY operator. + * A CorrelationLocalSimulativeReduceSinkOperator will take care actions of startGroup and endGroup of its + * succeeding JOIN or GBY operator. + **/ +public class CorrelationLocalSimulativeReduceSinkOperator + extends BaseReduceSinkOperator + implements Serializable { + + private static final long serialVersionUID = 1L; + + transient TableDesc keyTableDesc; + transient TableDesc valueTableDesc; + + transient Deserializer inputKeyDeserializer; + + transient SerDe inputValueDeserializer; + + transient ByteWritable tagWritable; + + transient ObjectInspector outputKeyObjectInspector; + transient ObjectInspector outputValueObjectInspector; + transient ObjectInspector[] outputPartitionObjectInspectors; + + private ArrayList forwardedRow; + private Object keyObject; + private Object valueObject; + + private static String[] fieldNames; + + static { + ArrayList fieldNameArray = new ArrayList(); + for (Utilities.ReduceField r : Utilities.ReduceField.values()) { + fieldNameArray.add(r.toString()); + } + fieldNames = fieldNameArray.toArray(new String[0]); + } + + public CorrelationLocalSimulativeReduceSinkOperator(){ + + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + forwardedRow = new ArrayList(3); + tagByte = new byte[1]; + tagWritable = new ByteWritable(); + tempInspectableObject = new InspectableObject(); + keyWritable = new HiveKey(); + assert childOperatorsArray.length == 1; + try { + keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()]; + int i = 0; + for (ExprNodeDesc e : conf.getKeyCols()) { + keyEval[i++] = ExprNodeEvaluatorFactory.get(e); + } + + numDistributionKeys = conf.getNumDistributionKeys(); + distinctColIndices = conf.getDistinctColumnIndices(); + numDistinctExprs = distinctColIndices.size(); + + valueEval = new ExprNodeEvaluator[conf.getValueCols().size()]; + i = 0; + for (ExprNodeDesc e : conf.getValueCols()) { + valueEval[i++] = ExprNodeEvaluatorFactory.get(e); + } + + tag = conf.getTag(); + tagByte[0] = (byte) tag; + tagWritable.set(tagByte[0]); + LOG.info("Using tag = " + tag); + + TableDesc keyTableDesc = conf.getKeySerializeInfo(); + keySerializer = (Serializer) keyTableDesc.getDeserializerClass() + .newInstance(); + keySerializer.initialize(null, keyTableDesc.getProperties()); + keyIsText = keySerializer.getSerializedClass().equals(Text.class); + + inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc + .getDeserializerClass(), null); + inputKeyDeserializer.initialize(null, keyTableDesc.getProperties()); + outputKeyObjectInspector = inputKeyDeserializer.getObjectInspector(); + + TableDesc valueTableDesc = conf.getValueSerializeInfo(); + valueSerializer = (Serializer) valueTableDesc.getDeserializerClass() + .newInstance(); + valueSerializer.initialize(null, valueTableDesc.getProperties()); + + inputValueDeserializer = (SerDe) ReflectionUtils.newInstance( + valueTableDesc.getDeserializerClass(), null); + inputValueDeserializer.initialize(null, valueTableDesc + .getProperties()); + outputValueObjectInspector = inputValueDeserializer.getObjectInspector(); + + ObjectInspector rowInspector = inputObjInspectors[0]; + + keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval, + distinctColIndices, + conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector); + valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, conf + .getOutputValueColumnNames(), rowInspector); + int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1; + int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 : + numDistributionKeys; + cachedKeys = new Object[numKeys][keyLen]; + cachedValues = new Object[valueEval.length]; + assert cachedKeys.length == 1; + + ArrayList ois = new ArrayList(); + ois.add(outputKeyObjectInspector); + ois.add(outputValueObjectInspector); + ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector); + + outputObjInspector = ObjectInspectorFactory + .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois); + + LOG.info("Simulative ReduceSink inputObjInspectors" + + ((StructObjectInspector) inputObjInspectors[0]).getTypeName()); + + LOG.info("Simulative ReduceSink outputObjInspectors " + + this.getChildOperators().get(0).getParentOperators().indexOf(this) + + " " + ((StructObjectInspector) outputObjInspector).getTypeName()); + + initializeChildren(hconf); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + private BytesWritable groupKey; + + @Override + public void processOp(Object row, int tag) throws HiveException { + + try { + // Evaluate the value + for (int i = 0; i < valueEval.length; i++) { + cachedValues[i] = valueEval[i].evaluate(row); + } + // Serialize the value + value = valueSerializer.serialize(cachedValues, valueObjectInspector); + valueObject = inputValueDeserializer.deserialize(value); + + // Evaluate the keys + Object[] distributionKeys = new Object[numDistributionKeys]; + for (int i = 0; i < numDistributionKeys; i++) { + distributionKeys[i] = keyEval[i].evaluate(row); + } + + if (numDistinctExprs > 0) { + // with distinct key(s) + for (int i = 0; i < numDistinctExprs; i++) { + System.arraycopy(distributionKeys, 0, cachedKeys[i], 0, numDistributionKeys); + Object[] distinctParameters = + new Object[distinctColIndices.get(i).size()]; + for (int j = 0; j < distinctParameters.length; j++) { + distinctParameters[j] = + keyEval[distinctColIndices.get(i).get(j)].evaluate(row); + } + cachedKeys[i][numDistributionKeys] = + new StandardUnion((byte)i, distinctParameters); + } + } else { + // no distinct key + System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys); + } + + + for (int i = 0; i < cachedKeys.length; i++) { + + if (keyIsText) { + Text key = (Text) keySerializer.serialize(cachedKeys[i], + keyObjectInspector); + keyWritable.set(key.getBytes(), 0, key.getLength()); + + } else { + // Must be BytesWritable + BytesWritable key = (BytesWritable) keySerializer.serialize( + cachedKeys[i], keyObjectInspector); + keyWritable.set(key.getBytes(), 0, key.getLength()); + } + + if (!keyWritable.equals(groupKey)) { + try { + keyObject = inputKeyDeserializer.deserialize(keyWritable); + } catch (Exception e) { + throw new HiveException( + "Hive Runtime Error: Unable to deserialize reduce input key from " + + Utilities.formatBinaryString(keyWritable.get(), 0, + keyWritable.getSize()) + " with properties " + + keyTableDesc.getProperties(), e); + } + if (groupKey == null) { // the first group + groupKey = new BytesWritable(); + } else { + // if its child has not been ended, end it + if(!keyWritable.equals(childOperatorsArray[0].getBytesWritableGroupKey())){ + childOperatorsArray[0].endGroup(); + } + } + groupKey.set(keyWritable.get(), 0, keyWritable.getSize()); + if(!groupKey.equals(childOperatorsArray[0].getBytesWritableGroupKey())){ + childOperatorsArray[0].startGroup(); + childOperatorsArray[0].setGroupKeyObject(keyObject); + childOperatorsArray[0].setBytesWritableGroupKey(groupKey); + } + + } + + forwardedRow.clear(); + forwardedRow.add(keyObject); + forwardedRow.add(valueObject); + forwardedRow.add(tagWritable); + forward(forwardedRow, outputObjInspector); + } + } catch (SerDeException e1) { + e1.printStackTrace(); + } + + } + + @Override + public void closeOp(boolean abort) throws HiveException { + if(!abort){ + //if(childOperatorsArray[0].getNumOfClosedParentOperators() == childOperatorsArray[0].getParentOperators().size() - 1){ + if(childOperatorsArray[0].allInitializedParentsAreClosed()) { + childOperatorsArray[0].endGroup(); + } + } + } + + @Override + public void startGroup() throws HiveException { + // do nothing + } + + @Override + public void endGroup() throws HiveException { + // do nothing + } + + @Override + public void setGroupKeyObject(Object keyObject) { + // do nothing + + } + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return new String("CLSReduceSink"); + } + + @Override + public OperatorType getType() { + return null; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java (revision 0) @@ -0,0 +1,845 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.OpParseContext; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.QB; +import org.apache.hadoop.hive.ql.parse.QBExpr; +import org.apache.hadoop.hive.ql.parse.RowResolver; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; + +/** + * Implementation of correlation optimizer. The optimization is based on + * the paper + * "YSmart: Yet Another SQL-to-MapReduce Translator" (Rubao Lee et al.). + * This optimizer first detects three kinds of + * correlations, Input Correlation (IC), Transit Correlation (TC) and Job Flow Correlation (JFC), + * and then merge correlated MapReduce-jobs (MR-jobs) into one MR-job. + * Correlation correlation detection and query plan tree transformation is the last transformation in the + * {@link Optimizer}. Since opColumnExprMap, opParseCtx, opRowResolver may be changed by other optimizers, + * currently, correlation optimizer has two phases. The first phase is the first transformation in the + * {@link Optimizer}. In the first phase, original opColumnExprMap, opParseCtx, opRowResolver + * will be recorded. Then, the second phase (the last transformation) will perform correlation detection and + * query plan tree transformation. + * + *

Correlations

+ * For the definitions of correlations, see the + * original paper. + * + *

Rules

+ * Rules for merging correlated MR-jobs implemented in this correlation optimizer are: + *
  • If an MR-job for a Join operation has the same partitioning keys with its all preceding MR-jobs, correlation optimizer + * merges these MR-jobs into one MR-job.
  • + *
  • If an MR-job for a GroupBy and Aggregation operation has the same partitioning keys with its preceding MR-job, correlation optimizer + * merges these two MR-jobs into one MR-job.
  • + * Note: In the current implementation, if correlation optimizer detects MR-jobs of a sub-plan tree are correlated, it transforms + * this sub-plan tree to a single MR-job when the input of this sub-plan tree is not a temporary table. Otherwise, the current implementation + * will ignore this sub-plan tree. + * + *

    Future Work

    + * There are several future work that will enhance the correlation optimizer. Here are three examples: + *
  • Add a new rule that is if two MR-jobs share the same partitioning keys and they have common input tables, merge these two + * MR-jobs into a single MR-job.
  • + *
  • The current implementation detects MR-jobs which have the same partitioning keys as correlated MR-jobs. However, the condition + * of same partitioning keys can be relaxed to common partitioning keys.
  • + *
  • The current implementation cannot optimize MR-jobs for the aggregation functions with a distinct keyword, which should be supported + * in the future implementation of the correlation optimizer.
  • + */ + +public class CorrelationOptimizer implements Transform { + + static final private Log LOG = LogFactory.getLog(CorrelationOptimizer.class.getName()); + private final HashMap AliastoTabName; + private final HashMap AliastoTab; + + public CorrelationOptimizer() { + super(); + AliastoTabName = new HashMap(); + AliastoTab = new HashMap(); + pGraphContext = null; + } + + private void initializeAliastoTabNameMapping(QB qb) { + for (String alias: qb.getAliases()) { + AliastoTabName.put(alias, qb.getTabNameForAlias(alias)); + AliastoTab.put(alias, qb.getMetaData().getSrcForAlias(alias)); + } + for (String subqalias: qb.getSubqAliases()) { + QBExpr qbexpr = qb.getSubqForAlias(subqalias); + initializeAliastoTabNameMapping(qbexpr.getQB()); + } + } + + protected ParseContext pGraphContext; + private LinkedHashMap, OpParseContext> opParseCtx; + private final LinkedHashMap, OpParseContext> originalOpParseCtx = + new LinkedHashMap, OpParseContext>(); + private final LinkedHashMap, RowResolver> originalOpRowResolver = + new LinkedHashMap, RowResolver>(); + private final LinkedHashMap, Map> originalOpColumnExprMap = + new LinkedHashMap, Map>(); + + private boolean isPhase1 = true; + + private Map groupbyRegular2MapSide; + + /** + * Transform the query tree. Firstly, find out correlations between operations. + * Then, group these operators in groups + * @param pactx + * current parse context + * @throws SemanticException + */ + public ParseContext transform(ParseContext pctx) throws SemanticException { + + if (isPhase1) { + + pGraphContext = pctx; + opParseCtx = pctx.getOpParseCtx(); + + CorrelationNodePhase1ProcCtx correlationCtx = new CorrelationNodePhase1ProcCtx(); + + Map opRules = new LinkedHashMap(); + + Dispatcher disp = new DefaultRuleDispatcher(getPhase1DefaultProc(), opRules, correlationCtx); + GraphWalker ogw = new DefaultGraphWalker(disp); + + // Create a list of topop nodes + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pGraphContext.getTopOps().values()); + ogw.startWalking(topNodes, null); + isPhase1 = false; + + } else { + + /* Types of correlations: + * 1) Input Correlation: Multiple nodes have input correlation + (IC) if their input relation sets are not disjoint; + 2) Transit Correlation: Multiple nodes have transit correlation + (TC) if they have not only input correlation, but + also the same partition key; + 3) Job Flow Correlation: A node has job flow correlation + (JFC) with one of its child nodes if it has the same + partition key as that child node. + * */ + + pGraphContext = pctx; + opParseCtx = pctx.getOpParseCtx(); + + groupbyRegular2MapSide = pctx.getGroupbyRegular2MapSide(); + + initializeAliastoTabNameMapping(pGraphContext.getQB()); + + // 1: detect correlations + CorrelationNodeProcCtx correlationCtx = new CorrelationNodeProcCtx(); + + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", "RS%"), new CorrelationNodeProc()); + + Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, correlationCtx); + GraphWalker ogw = new DefaultGraphWalker(disp); + + // Create a list of topop nodes + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pGraphContext.getTopOps().values()); + ogw.startWalking(topNodes, null); + + // 2: transform the query plan tree + LOG.info("Begain query plan transformation based on intra-query correlations"); + for (IntraQueryCorrelation correlation: correlationCtx.getCorrelations()) { + pGraphContext = CorrelationOptimizerUtils.applyCorrelation( + correlation, pGraphContext, originalOpColumnExprMap, originalOpRowResolver, + groupbyRegular2MapSide, originalOpParseCtx); + } + LOG.info("Finish query plan transformation based on intra-query correlations"); + + } + + return pGraphContext; + } + + + + private NodeProcessor getPhase1DefaultProc() { + return new NodeProcessor() { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { + Operator op = (Operator)nd; + OpParseContext opCtx= opParseCtx.get(op); + + if (op.getColumnExprMap() != null) { + originalOpColumnExprMap.put(op, op.getColumnExprMap()); + } + originalOpParseCtx.put(op, opCtx); + originalOpRowResolver.put(op, opCtx.getRowResolver()); + + return null; + } + }; + } + + private class CorrelationNodeProc implements NodeProcessor { + + public ReduceSinkOperator findNextChildReduceSinkOperator(ReduceSinkOperator rsop) { + Operator op = rsop.getChildOperators().get(0); + while(!op.getName().equals("RS")) { + if (op.getName().equals("FS")) { + return null; + } + assert op.getChildOperators().size() <= 1; + op = op.getChildOperators().get(0); + } + return (ReduceSinkOperator)op; + } + + /** + * Find all peer ReduceSinkOperators (which have the same child operator of op) of op (op included). + */ + private ArrayList findPeerReduceSinkOperators(ReduceSinkOperator op) { + + ArrayList peerReduceSinkOperators = new ArrayList(); + + List> children = op.getChildOperators(); + assert children.size() == 1; + + for (Operator parent: children.get(0).getParentOperators()) { + assert (parent instanceof ReduceSinkOperator); + peerReduceSinkOperators.add((ReduceSinkOperator)parent); + } + + return peerReduceSinkOperators; + } + + private ArrayList findCorrelatedReduceSinkOperators(Operator op, + HashSet keyColumns, IntraQueryCorrelation correlation) throws Exception{ + + LOG.info("now detecting operator " + op.getIdentifier() + " " + op.getName()); + + ArrayList correlatedReduceSinkOps = new ArrayList(); + if (op.getParentOperators() == null) { + return correlatedReduceSinkOps; + } + if (originalOpColumnExprMap.get(op) == null && !(op instanceof ReduceSinkOperator)) { + assert op.getParentOperators().size() == 1; + correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators( + (Operator)op.getParentOperators().get(0), keyColumns, correlation)); + } else if (originalOpColumnExprMap.get(op) != null && !(op instanceof ReduceSinkOperator)) { + HashSet newKeyColumns = new HashSet(); + for (String keyColumn: keyColumns) { + ExprNodeDesc col = (ExprNodeDesc) originalOpColumnExprMap.get(op).get(keyColumn); + if (col instanceof ExprNodeColumnDesc) { + newKeyColumns.add(((ExprNodeColumnDesc)col).getColumn()); + } + } + + if (op.getName().equals("JOIN")) { + HashSet tableNeedToCheck = new HashSet(); + for (String keyColumn: keyColumns) { + for (ColumnInfo cinfo: originalOpParseCtx.get(op).getRowResolver().getColumnInfos()) { + if (keyColumn.equals(cinfo.getInternalName())) { + tableNeedToCheck.add(cinfo.getTabAlias()); + } + } + } + + for (Object parent: op.getParentOperators()) { + assert originalOpParseCtx.get(parent).getRowResolver().getTableNames().size() == 1; + for (String tbl: originalOpParseCtx.get(parent).getRowResolver().getTableNames()) { + if (tableNeedToCheck.contains(tbl)) { + correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators((Operator)parent, newKeyColumns, correlation)); + break; + } + } + } + + } else { + assert op.getParentOperators().size() == 1; + correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators((Operator)op.getParentOperators().get(0), newKeyColumns, correlation)); + } + + } else if (originalOpColumnExprMap.get(op) != null && op instanceof ReduceSinkOperator) { + + HashSet newKeyColumns = new HashSet(); + for (String keyColumn: keyColumns) { + ExprNodeDesc col = (ExprNodeDesc) originalOpColumnExprMap.get(op).get(keyColumn); + if (col instanceof ExprNodeColumnDesc) { + newKeyColumns.add(((ExprNodeColumnDesc)col).getColumn()); + } + } + + ReduceSinkOperator rsop = (ReduceSinkOperator)op; + HashSet thisKeyColumns = new HashSet(); + for (ExprNodeDesc key: rsop.getConf().getKeyCols()) { + if (key instanceof ExprNodeColumnDesc) { + thisKeyColumns.add(((ExprNodeColumnDesc)key).getColumn()); + } + } + + boolean isCorrelated = false; + Set intersection = new HashSet(newKeyColumns); + intersection.retainAll(thisKeyColumns); + // TODO: should use if intersection is empty to evaluate if two corresponding operators are correlated + //isCorrelated = !(intersection.isEmpty()); + isCorrelated = (intersection.size() == thisKeyColumns.size()); + + ReduceSinkOperator nextChildReduceSinkOperator = findNextChildReduceSinkOperator(rsop); + + if (isCorrelated) { + if (nextChildReduceSinkOperator.getChildOperators().get(0).getName().equals("JOIN")) { + if (intersection.size() != nextChildReduceSinkOperator.getConf().getKeyCols().size() || + intersection.size() != rsop.getConf().getKeyCols().size()) { + isCorrelated = false; + } + } + } + + if (isCorrelated) { + LOG.info("Operator " + op.getIdentifier() + " " + op.getName() + " is correlated"); + LOG.info("--keys of this operator: " + thisKeyColumns.toString()); + LOG.info("--keys of child operator: " + keyColumns.toString()); + LOG.info("--keys of child operator mapped to this operator:" + newKeyColumns.toString()); + if (((Operator)(op.getChildOperators().get(0))).getName().equals("JOIN")) { + ArrayList peers = findPeerReduceSinkOperators(rsop); + correlatedReduceSinkOps.addAll(peers); + } else { + correlatedReduceSinkOps.add(rsop); + } + // this if block are useful when we use "isCorrelated = !(intersection.isEmpty());" for the evaluation of isCorrelated + if (nextChildReduceSinkOperator.getChildOperators().get(0).getName().equals("GBY") && + (intersection.size() < nextChildReduceSinkOperator.getConf().getKeyCols().size())) { + LOG.info("--found this RS-GBY pattern that needs to be replaced to GBY-RS-GBY patterns " + + "common keys" + intersection.size() + ", keys of next group by operator" + nextChildReduceSinkOperator.getConf().getKeyCols().size()); + correlation.addToRSGBYToBeReplacedByGBYRSGBY(nextChildReduceSinkOperator); + } + + } else { + LOG.info("Operator " + op.getIdentifier() + " " + op.getName() + " is not correlated"); + LOG.info("--keys of this operator: " + thisKeyColumns.toString()); + LOG.info("--keys of child operator: " + keyColumns.toString()); + LOG.info("--keys of child operator mapped to this operator:" + newKeyColumns.toString()); + correlatedReduceSinkOps.clear(); + correlation.getRSGBYToBeReplacedByGBYRSGBY().clear(); + } + } else { + throw new Exception("Correlation optimizer: ReduceSinkOperator " + op.getIdentifier() + " does not have ColumnExprMap"); + } + return correlatedReduceSinkOps; + } + + private ArrayList exploitJFC(ReduceSinkOperator op, CorrelationNodeProcCtx correlationCtx, IntraQueryCorrelation correlation) { + + correlationCtx.addWalked(op); + correlation.addToAllReduceSinkOperators(op); + + ArrayList ReduceSinkOperators = new ArrayList(); + + boolean sholdDetect = true; + + ArrayList keys = op.getConf().getKeyCols(); + HashSet keyColumns = new HashSet(); + for (ExprNodeDesc key: keys) { + if (!(key instanceof ExprNodeColumnDesc)) { + sholdDetect = false; + } else { + keyColumns.add(((ExprNodeColumnDesc)key).getColumn()); + } + } + + if (sholdDetect) { + ArrayList newReduceSinkOperators = new ArrayList(); + for (Operator parent: op.getParentOperators()) { + try { + LOG.info("Operator " + op.getIdentifier() + ": start detecting correlation from this operator"); + LOG.info("--keys of this operator: " + keyColumns.toString()); + ArrayList correlatedReduceSinkOperators = + findCorrelatedReduceSinkOperators(parent, keyColumns, correlation); + if (correlatedReduceSinkOperators == null || correlatedReduceSinkOperators.size() == 0) { + newReduceSinkOperators.add(op); + } else { + ArrayList deduplicatedCorrelatedReduceSinkOperators = new ArrayList(); + for (ReduceSinkOperator rsop: correlatedReduceSinkOperators) { + if (!deduplicatedCorrelatedReduceSinkOperators.contains(rsop)) { + deduplicatedCorrelatedReduceSinkOperators.add(rsop); + } + } + for (ReduceSinkOperator rsop: deduplicatedCorrelatedReduceSinkOperators) { + if ( !correlation.getUp2downRSops().containsKey(op) ) { + correlation.getUp2downRSops().put(op, new ArrayList()); + } + correlation.getUp2downRSops().get(op).add(rsop); + + if ( !correlation.getDown2upRSops().containsKey(rsop)) { + correlation.getDown2upRSops().put(rsop, new ArrayList()); + } + correlation.getDown2upRSops().get(rsop).add(op); + ArrayList exploited = exploitJFC(rsop, correlationCtx, correlation); + if (exploited == null || exploited.size() == 0) { + newReduceSinkOperators.add(rsop); + } else { + newReduceSinkOperators.addAll(exploited); + } + } + } + + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + ReduceSinkOperators.clear(); + ReduceSinkOperators.addAll(newReduceSinkOperators); + } + return ReduceSinkOperators; + } + + private TableScanOperator findTableScanOPerator(Operator startPoint) { + Operator thisOp = (Operator) startPoint.getParentOperators().get(0); + while(true) { + if (thisOp.getName().equals("RS")) { + return null; + } else if (thisOp.getName().equals("TS")) { + return (TableScanOperator)thisOp; + } + else { + if (thisOp.getParentOperators() != null) { + thisOp = (Operator) thisOp.getParentOperators().get(0); + } + else { + break; + } + } + } + return null; + } + + private void annotateOpPlan(IntraQueryCorrelation correlation) { + HashMap bottomReduceSink2OpPlanMap = new HashMap(); + int count = 0; + + for (ReduceSinkOperator rsop: correlation.getBottomReduceSinkOperators()) { + if (!bottomReduceSink2OpPlanMap.containsKey(rsop)) { + bottomReduceSink2OpPlanMap.put(rsop, count); + for (ReduceSinkOperator peerRSop: findPeerReduceSinkOperators(rsop)) { + if (correlation.getBottomReduceSinkOperators().contains(peerRSop)) { + bottomReduceSink2OpPlanMap.put(peerRSop, count); + } + } + count++; + } + } + correlation.setBottomReduceSink2OperationPathMap(bottomReduceSink2OpPlanMap); + } + + public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, + Object... nodeOutputs) throws SemanticException { + LOG.info("Walk to operator " + ((Operator)nd).getIdentifier() + " " + ((Operator)nd).getName()); + + CorrelationNodeProcCtx correlationCtx = (CorrelationNodeProcCtx)ctx; + + ReduceSinkOperator op = (ReduceSinkOperator)nd; + + if (correlationCtx.isWalked(op)) { + return null; + } + + if (op.getConf().getKeyCols().size() == 0 || + (!op.getChildOperators().get(0).getName().equals("JOIN") && + !op.getChildOperators().get(0).getName().equals("GBY"))) { + correlationCtx.addWalked(op); + return null; + } + + // 1: find out correlation + IntraQueryCorrelation correlation = new IntraQueryCorrelation(); + ArrayList peerReduceSinkOperators = findPeerReduceSinkOperators(op); + ArrayList bottomReduceSinkOperators = new ArrayList(); + for (ReduceSinkOperator rsop: peerReduceSinkOperators) { + + ArrayList thisBottomReduceSinkOperators= exploitJFC(rsop, correlationCtx, correlation); + + // TODO: if we use "isCorrelated = !(intersection.isEmpty());" in the method findCorrelatedReduceSinkOperators + // for the evaluation of isCorrelated, uncomment the following if block. The reduceSinkOperator at the top level + // should take special take, since we cannot evaluate if the relationship of the set of keys of this operator with + // that of its next parent reduceSinkOperator. + // if (peerReduceSinkOperators.size() == 1) { + // correlation.addToRSGBYToBeReplacedByGBYRSGBY(rsop); + // } + if (thisBottomReduceSinkOperators.size() == 0) { + thisBottomReduceSinkOperators.add(rsop); + } else { + boolean isClear = false; + for (ReduceSinkOperator bottomrsop: thisBottomReduceSinkOperators) { + TableScanOperator tsop = findTableScanOPerator(bottomrsop); + if (tsop == null) { + isClear = true; // currently the optimizer can only optimize correlations involving source tables + } else { + if ( !correlation.getTop2TSops().containsKey(rsop) ) { + correlation.getTop2TSops().put(rsop, new ArrayList()); + } + correlation.getTop2TSops().get(rsop).add(tsop); + + if ( !correlation.getBottom2TSops().containsKey(bottomrsop)) { + correlation.getBottom2TSops().put(bottomrsop, new ArrayList()); + } + correlation.getBottom2TSops().get(bottomrsop).add(tsop); + } + } + if (isClear) { + thisBottomReduceSinkOperators.clear(); + thisBottomReduceSinkOperators.add(rsop); + } + } + bottomReduceSinkOperators.addAll(thisBottomReduceSinkOperators); + } + + if (!peerReduceSinkOperators.containsAll(bottomReduceSinkOperators)) { + LOG.info("has job flow correlation"); + correlation.setJobFlowCorrelation(true); + correlation.setJFCCorrelation(peerReduceSinkOperators, bottomReduceSinkOperators); + annotateOpPlan(correlation); + } + + if (correlation.hasJobFlowCorrelation()) { + boolean hasICandTC = findICandTC(correlation); + LOG.info("has input correlation and transit correlation? " + hasICandTC); + correlation.setInputCorrelation(hasICandTC); + correlation.setTransitCorrelation(hasICandTC); + boolean isInvolve = isInvolveSelfJoin(correlation); + LOG.info("involve self-join? " + isInvolve); + correlation.setInvolveSelfJoin(isInvolve); + //TODO: support self-join involved cases. For self-join related operation paths, after the correlation dispatch operator, each path should be filtered by a + // filter operator + if (!isInvolve) { + LOG.info("correlation detected"); + correlationCtx.addCorrelation(correlation); + } else { + LOG.info("correlation discarded. The current optimizer cannot optimize it"); + } + } + + correlationCtx.addWalkedAll(peerReduceSinkOperators); + + return null; + } + + private boolean isInvolveSelfJoin(IntraQueryCorrelation correlation) { + boolean isInvolve = false; + for (Entry> entry: correlation.getTable2CorrelatedRSops().entrySet()) { + for (ReduceSinkOperator rsop: entry.getValue()) { + HashSet intersection = new HashSet(findPeerReduceSinkOperators(rsop)); + intersection.retainAll(entry.getValue()); + + // if involve self-join + if (intersection.size() > 1) { + isInvolve = true; + return isInvolve; + } + } + } + return isInvolve; + } + + private boolean findICandTC(IntraQueryCorrelation correlation) { + + boolean hasICandTC = false; + HashMap> table2RSops = new HashMap>(); + HashMap> table2TSops = new HashMap>(); + + for (Entry> entry: correlation.getBottom2TSops().entrySet()) { + String tbl = AliastoTabName.get(entry.getValue().get(0).getConf().getAlias()); + if (!table2RSops.containsKey(tbl) && !table2TSops.containsKey(tbl)) { + table2RSops.put(tbl, new ArrayList()); + table2TSops.put(tbl, new ArrayList()); + } + assert entry.getValue().size() == 1; + table2RSops.get(tbl).add(entry.getKey()); + table2TSops.get(tbl).add(entry.getValue().get(0)); + } + + for (Entry> entry: table2RSops.entrySet()) { + if (entry.getValue().size() > 1) { + hasICandTC = true; + } + } + + correlation.setICandTCCorrelation(table2RSops, table2TSops); + + return hasICandTC; + } + } + + private NodeProcessor getDefaultProc() { + return new NodeProcessor() { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { + LOG.info("Walk to operator " + ((Operator)nd).getIdentifier() + " " + ((Operator)nd).getName()); + return null; + } + }; + } + + private class CorrelationNodePhase1ProcCtx implements NodeProcessorCtx { + + } + + + public class IntraQueryCorrelation{ + + private final HashMap> down2upRSops = new HashMap>(); + private final HashMap> up2downRSops = new HashMap>(); + + private final HashMap> top2TSops = new HashMap>(); + private final HashMap> bottom2TSops = new HashMap>(); + + private ArrayList topReduceSinkOperators; + private ArrayList bottomReduceSinkOperators; + + private HashMap> table2CorrelatedRSops; + + private HashMap> table2CorrelatedTSops; + + private HashMap bottomReduceSink2OperationPathMap; + + private final HashMap>> dispatchConf = + new HashMap>>(); //inputTag->(Child->outputTag) + private final HashMap>> dispatchValueSelectDescConf = + new HashMap>>(); //inputTag->(Child->SelectDesc) + private final HashMap>> dispatchKeySelectDescConf = + new HashMap>>(); //inputTag->(Child->SelectDesc) + + private final HashSet allReduceSinkOperators = new HashSet(); + + // this set contains all ReduceSink-GroupBy operator-pairs that should be be replaced by GroupBy-ReduceSink-GroupBy pattern. + // the type of first GroupByOperator is hash type and this one will be used to group records. + private final HashSet rSGBYToBeReplacedByGBYRSGBY = new HashSet(); + + public void addToRSGBYToBeReplacedByGBYRSGBY(ReduceSinkOperator rsop) { + rSGBYToBeReplacedByGBYRSGBY.add(rsop); + } + + public HashSet getRSGBYToBeReplacedByGBYRSGBY() { + return rSGBYToBeReplacedByGBYRSGBY; + } + + public void addToAllReduceSinkOperators(ReduceSinkOperator rsop) { + allReduceSinkOperators.add(rsop); + } + + public HashSet getAllReduceSinkOperators() { + return allReduceSinkOperators; + } + + public HashMap>> getDispatchConf() { + return dispatchConf; + } + + public HashMap>> getDispatchValueSelectDescConf() { + return dispatchValueSelectDescConf; + } + + public HashMap>> getDispatchKeySelectDescConf() { + return dispatchKeySelectDescConf; + } + + public void addOperationPathToDispatchConf(Integer opPlan) { + if (!dispatchConf.containsKey(opPlan)) { + dispatchConf.put(opPlan, new HashMap>()); + } + } + + public HashMap> getDispatchConfForOperationPath(Integer opPlan) { + return dispatchConf.get(opPlan); + } + + public void addOperationPathToDispatchValueSelectDescConf(Integer opPlan) { + if (!dispatchValueSelectDescConf.containsKey(opPlan)) { + dispatchValueSelectDescConf.put(opPlan, new HashMap>()); + } + } + + public HashMap> getDispatchValueSelectDescConfForOperationPath(Integer opPlan) { + return dispatchValueSelectDescConf.get(opPlan); + } + + public void addOperationPathToDispatchKeySelectDescConf(Integer opPlan) { + if (!dispatchKeySelectDescConf.containsKey(opPlan)) { + dispatchKeySelectDescConf.put(opPlan, new HashMap>()); + } + } + + public HashMap> getDispatchKeySelectDescConfForOperationPath(Integer opPlan) { + return dispatchKeySelectDescConf.get(opPlan); + } + + private boolean inputCorrelation = false; + private boolean transitCorrelation = false; + private boolean jobFlowCorrelation = false; + + public void setBottomReduceSink2OperationPathMap(HashMap bottomReduceSink2OperationPathMap) { + this.bottomReduceSink2OperationPathMap = bottomReduceSink2OperationPathMap; + } + + public HashMap getBottomReduceSink2OperationPathMap() { + return bottomReduceSink2OperationPathMap; + } + + public void setInputCorrelation(boolean inputCorrelation) { + this.inputCorrelation = inputCorrelation; + } + + public boolean hasInputCorrelation() { + return inputCorrelation; + } + + public void setTransitCorrelation(boolean transitCorrelation) { + this.transitCorrelation = transitCorrelation; + } + + public boolean hasTransitCorrelation() { + return transitCorrelation; + } + + public void setJobFlowCorrelation(boolean jobFlowCorrelation) { + this.jobFlowCorrelation = jobFlowCorrelation; + } + + public boolean hasJobFlowCorrelation() { + return jobFlowCorrelation; + } + + public HashMap> getTop2TSops() { + return top2TSops; + } + + public HashMap> getBottom2TSops() { + return bottom2TSops; + } + + public HashMap> getDown2upRSops() { + return down2upRSops; + } + + public HashMap> getUp2downRSops() { + return up2downRSops; + } + + public void setJFCCorrelation(ArrayList peerReduceSinkOperators, + ArrayList bottomReduceSinkOperators) { + this.topReduceSinkOperators = peerReduceSinkOperators; + this.bottomReduceSinkOperators = bottomReduceSinkOperators; + } + + + public ArrayList getTopReduceSinkOperators() { + return topReduceSinkOperators; + } + + public ArrayList getBottomReduceSinkOperators() { + return bottomReduceSinkOperators; + } + + public void setICandTCCorrelation(HashMap> table2RSops, + HashMap> table2TSops) { + this.table2CorrelatedRSops = table2RSops; + this.table2CorrelatedTSops = table2TSops; + } + + public HashMap> getTable2CorrelatedRSops() { + return table2CorrelatedRSops; + } + + public HashMap> getTable2CorrelatedTSops() { + return table2CorrelatedTSops; + } + + private boolean isInvolveSelfJoin = false; + + public boolean isInvolveSelfJoin() { + return isInvolveSelfJoin; + } + + public void setInvolveSelfJoin(boolean isInvolveSelfJoin) { + this.isInvolveSelfJoin = isInvolveSelfJoin; + } + + } + + private class CorrelationNodeProcCtx implements NodeProcessorCtx { + + private final HashSet walked = new HashSet(); + + private final ArrayList correlations = new ArrayList(); + + public void addCorrelation(IntraQueryCorrelation correlation) { + correlations.add(correlation); + } + + public ArrayList getCorrelations() { + return correlations; + } + + public boolean isWalked(ReduceSinkOperator op) { + return walked.contains(op); + } + + public void addWalked(ReduceSinkOperator op) { + walked.add(op); + } + + public void addWalkedAll(Collection c) { + walked.addAll(c); + } + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (revision 1237326) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (working copy) @@ -34,8 +34,8 @@ import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; import org.apache.hadoop.hive.ql.plan.SMBJoinDesc; -import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; @@ -580,7 +580,7 @@ } @Override - protected boolean allInitializedParentsAreClosed() { + public boolean allInitializedParentsAreClosed() { return true; } Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (revision 1237326) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (working copy) @@ -44,7 +44,15 @@ * @param hiveConf */ public void initialize(HiveConf hiveConf) { + CorrelationOptimizer correlationOptimizer = new CorrelationOptimizer(); transformations = new ArrayList(); + // Add correlation optimizer for first phase query plan tree analysis. + // The first phase will record original opColumnExprMap, opParseCtx, opRowResolver, + // since these may be changed by other optimizers (e.g. entries in opColumnExprMap may be deleted). + // TODO: Make correlation optimizer 1 phase. + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION)) { + transformations.add(correlationOptimizer); + } // Add the transformation that computes the lineage information. transformations.add(new Generator()); if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCP)) { @@ -74,6 +82,11 @@ if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) { transformations.add(new ReduceSinkDeDuplication()); } + // The second phase of correlation optimizer used for correlation detection and query plan tree transformation. + // The second phase should be the last optimizer added into transformations. + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION)) { + transformations.add(correlationOptimizer); + } } /** Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java (revision 0) @@ -0,0 +1,406 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; + +/** + * Correlation dispatch operator implementation. + * If used, CorrelationReducerDispatchOperator is the first operator in reduce phase. + * It will dispatch the record to corresponding JOIN or GBY operators. + * Suppose there are n children of this dispatch operator, a input record will be + * evaluated by n DispatchHandler that is used to select the corresponding parts of a record + * and then forward to succeeding JOIN or GBY operators. + */ +public class CorrelationReducerDispatchOperator extends Operator implements Serializable{ + + private static final long serialVersionUID = 1L; + private static String[] fieldNames; + static { + ArrayList fieldNameArray = new ArrayList(); + for (Utilities.ReduceField r : Utilities.ReduceField.values()) { + fieldNameArray.add(r.toString()); + } + fieldNames = fieldNameArray.toArray(new String[0]); + } + + protected static class DispatchHandler { + + protected Log l4j = LogFactory.getLog(this.getClass().getName()); + + private final ObjectInspector[] inputObjInspector; + private ObjectInspector outputObjInspector; + private ObjectInspector keyObjInspector; + private ObjectInspector valueObjInspector; + private final byte inputTag; + private final byte outputTag; + private final byte childIndx; + private final ByteWritable outputTagByteWritable; + private final SelectDesc selectDesc; + private final SelectDesc keySelectDesc; + private ExprNodeEvaluator[] keyEval; + private ExprNodeEvaluator[] eval; + + // counters for debugging + private transient long cntr = 0; + private transient long nextCntr = 1; + + private long getNextCntr(long cntr) { + // A very simple counter to keep track of number of rows processed by an + // operator. It dumps + // every 1 million times, and quickly before that + if (cntr >= 1000000) { + return cntr + 1000000; + } + + return 10 * cntr; + } + + public long getCntr() { + return this.cntr; + } + + private final Log LOG; + private final boolean isLogInfoEnabled; + private final String id; + + public DispatchHandler(ObjectInspector[] inputObjInspector, byte inputTag, byte childIndx, byte outputTag, + SelectDesc selectDesc, SelectDesc keySelectDesc, Log LOG, String id) + throws HiveException { + this.inputObjInspector = inputObjInspector; + assert this.inputObjInspector.length == 1; + this.inputTag = inputTag; + this.childIndx = childIndx; + this.outputTag = outputTag; + this.selectDesc = selectDesc; + this.keySelectDesc = keySelectDesc; + this.outputTagByteWritable = new ByteWritable(outputTag); + this.LOG = LOG; + this.isLogInfoEnabled = LOG.isInfoEnabled(); + this.id = id; + init(); + } + + private void init() throws HiveException { + ArrayList ois = new ArrayList(); + if (keySelectDesc.isSelStarNoCompute()) { + ois.add((ObjectInspector) ((ArrayList)inputObjInspector[0]).get(0)); + } else { + ArrayList colList = this.keySelectDesc.getColList(); + keyEval = new ExprNodeEvaluator[colList.size()]; + for (int k = 0; k < colList.size(); k++) { + assert (colList.get(k) != null); + keyEval[k] = ExprNodeEvaluatorFactory.get(colList.get(k)); + } + keyObjInspector = initEvaluatorsAndReturnStruct(keyEval, keySelectDesc + .getOutputColumnNames(), ((StandardStructObjectInspector) inputObjInspector[0]).getAllStructFieldRefs().get(0).getFieldObjectInspector()); + + ois.add(keyObjInspector); + l4j.info("Key: input tag " + (int)inputTag + ", output tag " + (int)outputTag + ", SELECT inputOIForThisTag" + + ((StructObjectInspector) inputObjInspector[0]).getTypeName()); + + } + if (selectDesc.isSelStarNoCompute()) { + ois.add((ObjectInspector) ((ArrayList)inputObjInspector[0]).get(1)); + } else { + ArrayList colList = this.selectDesc.getColList(); + eval = new ExprNodeEvaluator[colList.size()]; + for (int k = 0; k < colList.size(); k++) { + assert (colList.get(k) != null); + eval[k] = ExprNodeEvaluatorFactory.get(colList.get(k)); + } + valueObjInspector = initEvaluatorsAndReturnStruct(eval, selectDesc + .getOutputColumnNames(), ((StandardStructObjectInspector) inputObjInspector[0]).getAllStructFieldRefs().get(1).getFieldObjectInspector()); + + ois.add(valueObjInspector); + l4j.info("input tag " + (int)inputTag + ", output tag " + (int)outputTag + ", SELECT inputOIForThisTag" + + ((StructObjectInspector) inputObjInspector[0]).getTypeName()); //Yin + + } + ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector); + outputObjInspector = ObjectInspectorFactory + .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois); + l4j.info("input tag " + (int)inputTag + ", output tag " + (int)outputTag + ", SELECT outputObjInspector" + + ((StructObjectInspector) outputObjInspector).getTypeName()); //Yin + } + + public ObjectInspector getOutputObjInspector() { + return outputObjInspector; + } + + public Object process(Object row) throws HiveException { + Object[] keyOutput = new Object[keyEval.length]; + Object[] valueOutput = new Object[eval.length]; + ArrayList outputRow = new ArrayList(3); + List thisRow = (List)row; + if (keySelectDesc.isSelStarNoCompute()) { + outputRow.add(thisRow.get(0)); + } else { + Object key = thisRow.get(0); + for (int j = 0; j < keyEval.length; j++) { + try { + keyOutput[j] = keyEval[j].evaluate(key); + + } catch (HiveException e) { + throw e; + } catch (RuntimeException e) { + throw new HiveException("Error evaluating " + + keySelectDesc.getColList().get(j).getExprString(), e); + } + } + outputRow.add(keyOutput); + } + + if (selectDesc.isSelStarNoCompute()) { + outputRow.add(thisRow.get(1)); + } else { + Object value = thisRow.get(1); + for (int j = 0; j < eval.length; j++) { + try { + valueOutput[j] = eval[j].evaluate(value); + } catch (HiveException e) { + throw e; + } catch (RuntimeException e) { + throw new HiveException("Error evaluating " + + selectDesc.getColList().get(j).getExprString(), e); + } + } + outputRow.add(valueOutput); + } + outputRow.add(outputTagByteWritable); + + if (isLogInfoEnabled) { + cntr++; + if (cntr == nextCntr) { + LOG.info(id + "(inputTag, childIndx, outputTag)=(" + inputTag + ", " + childIndx + ", " + outputTag + "), forwarding " + cntr + " rows"); + nextCntr = getNextCntr(cntr); + } + } + + return outputRow; + } + + public void printCloseOpLog() { + LOG.info(id + "(inputTag, childIndx, outputTag)=(" + inputTag + ", " + childIndx + ", " + outputTag + "), forwarded " + cntr + " rows"); + } + + } + + //inputTag->(Child->List) + private HashMap>> dispatchConf; + //inputTag->(Child->List) + private HashMap>> dispatchValueSelectDescConf; + //inputTag->(Child->List) + private HashMap>> dispatchKeySelectDescConf; + //inputTag->(Child->List) + private HashMap>> dispatchHandlers; + //Child->(outputTag->DispatchHandler) + private HashMap> child2OutputTag2DispatchHandlers; + //Child->Child's inputObjInspectors + private HashMap childInputObjInspectors; + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + + dispatchConf = conf.getDispatchConf(); + dispatchValueSelectDescConf = conf.getDispatchValueSelectDescConf(); + dispatchKeySelectDescConf = conf.getDispatchKeySelectDescConf(); + dispatchHandlers = new HashMap>>(); + for (Entry>> entry: dispatchConf.entrySet()) { + HashMap> tmp = new HashMap>(); + for (Entry> child2outputTag: entry.getValue().entrySet()) { + tmp.put(child2outputTag.getKey(), new ArrayList()); + int indx = 0; + for (Integer outputTag: child2outputTag.getValue()) { + tmp.get(child2outputTag.getKey()).add( + new DispatchHandler(new ObjectInspector[]{inputObjInspectors[entry.getKey()]}, + entry.getKey().byteValue(), child2outputTag.getKey().byteValue(), outputTag.byteValue(), + dispatchValueSelectDescConf.get(entry.getKey()).get(child2outputTag.getKey()).get(indx), + dispatchKeySelectDescConf.get(entry.getKey()).get(child2outputTag.getKey()).get(indx), LOG, id)); + indx++; + } + } + dispatchHandlers.put(entry.getKey(), tmp); + } + + child2OutputTag2DispatchHandlers = new HashMap>(); + for (Entry>> entry: dispatchConf.entrySet()) { + for (Entry> child2outputTag: entry.getValue().entrySet()) { + if (!child2OutputTag2DispatchHandlers.containsKey(child2outputTag.getKey())) { + child2OutputTag2DispatchHandlers.put(child2outputTag.getKey(), new HashMap()); + } + int indx = 0; + for (Integer outputTag: child2outputTag.getValue()) { + child2OutputTag2DispatchHandlers.get(child2outputTag.getKey()). + put(outputTag, dispatchHandlers.get(entry.getKey()).get(child2outputTag.getKey()).get(indx)); + indx++; + } + + } + } + + childInputObjInspectors = new HashMap(); + for (Entry> entry: + child2OutputTag2DispatchHandlers.entrySet()) { + Integer l = Collections.max(entry.getValue().keySet()); + ObjectInspector[] childObjInspectors = new ObjectInspector[l.intValue() + 1]; + for (Entry e: entry.getValue().entrySet()) { + if (e.getKey().intValue() == -1) { + assert childObjInspectors.length == 1; + childObjInspectors[0] = e.getValue().getOutputObjInspector(); + } else { + childObjInspectors[e.getKey().intValue()] = e.getValue().getOutputObjInspector(); + } + } + childInputObjInspectors.put(entry.getKey(), childObjInspectors); + } + + initializeChildren(hconf); + } + + //Each child should has its own outputObjInspector + @Override + protected void initializeChildren(Configuration hconf) throws HiveException { + state = State.INIT; + LOG.info("Operator " + id + " " + getName() + " initialized"); + if (childOperators == null) { + return; + } + LOG.info("Initializing children of " + id + " " + getName()); + for (int i = 0; i < childOperatorsArray.length; i++) { + LOG.info("Initializing child " + i + " " + childOperatorsArray[i].getIdentifier() + " " + + childOperatorsArray[i].getName() + + " " + childInputObjInspectors.get(i).length); + childOperatorsArray[i].initialize(hconf, childInputObjInspectors.get(i)); + if (reporter != null) { + childOperatorsArray[i].setReporter(reporter); + } + } + } + + private int opTags; + private int inputTag; + + @Override + public void processOp(Object row, int tag) throws HiveException { + ArrayList thisRow = (ArrayList)row; + assert thisRow.size() == 4; + opTags = ((ByteWritable)thisRow.get(3)).get(); + inputTag = (int)((ByteWritable)thisRow.get(2)).get(); + forward(thisRow.subList(0, 3), inputObjInspectors[inputTag]); + } + + @Override + public void forward(Object row, ObjectInspector rowInspector) + throws HiveException { + if ((++outputRows % 1000) == 0) { + if (counterNameToEnum != null) { + incrCounter(numOutputRowsCntr, outputRows); + outputRows = 0; + } + } + + if (childOperatorsArray == null && childOperators != null) { + throw new HiveException( + "Internal Hive error during operator initialization."); + } + + if ((childOperatorsArray == null) || (getDone())) { + return; + } + + int childrenDone = 0; + int forwardFLag = 1; + assert childOperatorsArray.length <= 8; + for (int i = 0; i < childOperatorsArray.length; i++) { + Operator o = childOperatorsArray[i]; + if (o.getDone()) { + childrenDone++; + } else { + if ((opTags & (forwardFLag << i)) != 0){ + for(int j = 0; j> childIndx2DispatchHandlers: + dispatchHandlers.values()) { + for (ArrayList dispatchHandlers: childIndx2DispatchHandlers.values()) { + for (DispatchHandler dispatchHandler: dispatchHandlers) { + dispatchHandler.printCloseOpLog(); + } + } + } + } + + @Override + public void setGroupKeyObject(Object keyObject) { + this.groupKeyObject = keyObject; + for (Operator op : childOperators) { + op.setGroupKeyObject(keyObject); + } + } + + @Override + public OperatorType getType() { + // TODO Auto-generated method stub + return null; + } + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return new String("CDP"); + } + +} Index: ql/src/test/results/compiler/plan/groupby3.q.xml =================================================================== --- ql/src/test/results/compiler/plan/groupby3.q.xml (revision 1237326) +++ ql/src/test/results/compiler/plan/groupby3.q.xml (working copy) @@ -158,7 +158,125 @@ - + + + VALUE._col4 + + + _col5 + + + + + + + + string + + + + + + + VALUE._col3 + + + _col4 + + + + + + + + + + + VALUE._col2 + + + _col3 + + + + + + + + + + count + + + sum + + + + + + + + + bigint + + + + + + + double + + + + + + + + + + + VALUE._col1 + + + _col2 + + + + + + + + + + + VALUE._col0 + + + _col1 + + + + + + + + + + + KEY._col0:0._col0 + + + _col0 + + + + + + + + + + @@ -176,21 +294,7 @@ - - - _col0 - - - - - - - - string - - - - + @@ -261,98 +365,19 @@ - - - _col1 - - - - - - - - double - - - - + - - - _col2 - - - - - - - - - - count - - - sum - - - - - - - - - bigint - - - - - - - - - - - + - - - _col3 - - - - - - - - + - - - _col4 - - - - - - - - + - - - _col5 - - - - - - - - + @@ -433,7 +458,7 @@ VALUE._col0 - + @@ -966,7 +991,7 @@ - + @@ -1045,7 +1070,7 @@ src - + @@ -1246,7 +1271,7 @@ src - + @@ -1532,7 +1557,7 @@ - + @@ -1545,7 +1570,7 @@ - + @@ -1558,7 +1583,7 @@ - + @@ -1600,7 +1625,7 @@ _col4 - + _col4 @@ -1614,7 +1639,7 @@ _col3 - + _col3 @@ -1628,7 +1653,7 @@ _col2 - + _col2 @@ -1636,13 +1661,13 @@ - + _col1 - + _col1 @@ -1650,13 +1675,13 @@ - + _col0 - + _col0 @@ -1664,7 +1689,7 @@ - + @@ -1675,19 +1700,19 @@ - + - + - + - + - + @@ -1751,7 +1776,7 @@ _col0 - + @@ -1764,7 +1789,7 @@ _col1 - + @@ -1777,7 +1802,7 @@ _col2 - + @@ -1843,7 +1868,7 @@ VALUE._col0 - + @@ -2040,7 +2065,7 @@ - + @@ -2053,7 +2078,7 @@ - + @@ -2066,7 +2091,7 @@ - + Index: ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java (revision 0) @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; + +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; + + +/** + * Correlation composite operator Descriptor implementation. + * + */ +@Explain(displayName = "Correlation Composite Operator") +public class CorrelationCompositeDesc implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private ReduceSinkOperator correspondingReduceSinkOperator; + + public CorrelationCompositeDesc(){ + + } + + public CorrelationCompositeDesc(ReduceSinkOperator correspondingReduceSinkOperator){ + this.correspondingReduceSinkOperator = correspondingReduceSinkOperator; + } + + public void setCorrespondingReduceSinkOperator( + ReduceSinkOperator correspondingReduceSinkOperator){ + this.correspondingReduceSinkOperator = correspondingReduceSinkOperator; + } + + public ReduceSinkOperator getCorrespondingReduceSinkOperator(){ + return correspondingReduceSinkOperator; + } + + private int[] allOperationPathTags; + + public void setAllOperationPathTags(int[] allOperationPathTags){ + this.allOperationPathTags = allOperationPathTags; + } + + public int[] getAllOperationPathTags(){ + return allOperationPathTags; + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (revision 1237326) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (working copy) @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.hooks.LineageInfo; @@ -80,7 +81,10 @@ // reducer private Map> groupOpToInputTables; private Map prunedPartitions; - + + // map the implementation of group by operator with the RS-GBY pattern to pattern with map-side + // aggregation enabled like GBY-RS-GBY. This variable is only used by CorrelationOptimizer + Map groupbyRegular2MapSide; /** * The lineage information. */ @@ -157,7 +161,8 @@ HashMap opToSamplePruner, SemanticAnalyzer.GlobalLimitCtx globalLimitCtx, HashMap nameToSplitSample, - HashSet semanticInputs, List> rootTasks) { + HashSet semanticInputs, List> rootTasks, + Map groupbyRegular2MapSide) { this.conf = conf; this.qb = qb; this.ast = ast; @@ -183,6 +188,7 @@ this.globalLimitCtx = globalLimitCtx; this.semanticInputs = semanticInputs; this.rootTasks = rootTasks; + this.groupbyRegular2MapSide = groupbyRegular2MapSide; } /** @@ -529,4 +535,8 @@ this.rootTasks.remove(rootTask); this.rootTasks.addAll(tasks); } + + public Map getGroupbyRegular2MapSide() { + return groupbyRegular2MapSide; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (revision 1237326) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (working copy) @@ -23,6 +23,9 @@ import java.util.List; import org.apache.hadoop.hive.ql.plan.CollectDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationLocalSimulativeReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc; import org.apache.hadoop.hive.ql.plan.ExtractDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc; @@ -91,6 +94,12 @@ HashTableDummyOperator.class)); opvec.add(new OpTuple(HashTableSinkDesc.class, HashTableSinkOperator.class)); + opvec.add(new OpTuple(CorrelationCompositeDesc.class, + CorrelationCompositeOperator.class)); + opvec.add(new OpTuple(CorrelationReducerDispatchDesc.class, + CorrelationReducerDispatchOperator.class)); + opvec.add(new OpTuple(CorrelationLocalSimulativeReduceSinkDesc.class, + CorrelationLocalSimulativeReduceSinkOperator.class)); } public static Operator get(Class opClass) { Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (revision 1237326) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (working copy) @@ -60,16 +60,16 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; -import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; -import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; /** * General utility common functions for the Processor to convert operator into @@ -114,8 +114,14 @@ } if (reducer.getClass() == JoinOperator.class) { plan.setNeedsTagging(true); + plan.setNeedsOperationPathTagging(false); } + if (op.getConf().getNeedsOperationPathTagging()) { + plan.setNeedsTagging(true); + plan.setNeedsOperationPathTagging(true); + } + assert currTopOp != null; List> seenOps = opProcCtx.getSeenOps(); String currAliasId = opProcCtx.getCurrAliasId(); @@ -178,6 +184,7 @@ opTaskMap.put(reducer, currTask); if (reducer.getClass() == JoinOperator.class) { plan.setNeedsTagging(true); + plan.setNeedsOperationPathTagging(false); } ReduceSinkDesc desc = (ReduceSinkDesc) op.getConf(); plan.setNumReduceTasks(desc.getNumReducers()); @@ -310,6 +317,7 @@ if (reducer.getClass() == JoinOperator.class) { plan.setNeedsTagging(true); + plan.setNeedsOperationPathTagging(false); } initUnionPlan(opProcCtx, currTask, false); @@ -1053,6 +1061,7 @@ // dependent on the redTask if (reducer.getClass() == JoinOperator.class) { cplan.setNeedsTagging(true); + cplan.setNeedsOperationPathTagging(false); } } Index: ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (revision 1237326) +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (working copy) @@ -279,6 +279,7 @@ private void populateMapRedPlan3(Table src, Table src2) throws SemanticException { mr.setNumReduceTasks(Integer.valueOf(5)); mr.setNeedsTagging(true); + mr.setNeedsOperationPathTagging(false); ArrayList outputColumns = new ArrayList(); for (int i = 0; i < 2; i++) { outputColumns.add("_col" + i); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (revision 1237326) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (working copy) @@ -80,6 +80,11 @@ if (conf != null && conf.isGatherStats()) { gatherStats(row); } + + if (conf != null && conf.isForwardRowNumber()) { + setRowNumber(rowNumber+1); + } + forward(row, inputObjInspectors[tag]); } @@ -169,6 +174,12 @@ if (conf == null) { return; } + + LOG.info(this.getName() + " forward row number " + conf.isForwardRowNumber()); + if(conf.isForwardRowNumber()){ + initializeRowNumber(); + } + if (!conf.isGatherStats()) { return; } Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java (revision 0) @@ -0,0 +1,759 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.CorrelationCompositeOperator; +import org.apache.hadoop.hive.ql.exec.CorrelationLocalSimulativeReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.CorrelationOptimizer.IntraQueryCorrelation; +import org.apache.hadoop.hive.ql.parse.OpParseContext; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.RowResolver; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; +import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationLocalSimulativeReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc; +import org.apache.hadoop.hive.ql.plan.ForwardDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; + + +public final class CorrelationOptimizerUtils { + + static final private Log LOG = LogFactory.getLog(CorrelationOptimizerUtils.class.getName()); + + public static boolean isExisted(ExprNodeDesc expr, ArrayList col_list) { + for (ExprNodeDesc thisExpr: col_list) { + if (expr.getExprString().equals(thisExpr.getExprString())) { + return true; + } + } + return false; + } + + public static String getColumnName(Map opColumnExprMap, ExprNodeDesc expr) { + for (Entry entry: opColumnExprMap.entrySet()) { + if (expr.getExprString().equals(entry.getValue().getExprString())) { + return entry.getKey(); + } + } + return null; + } + + + public static Operator unionUsedColumnsAndMakeNewSelect(ArrayList rsops, + IntraQueryCorrelation correlation, LinkedHashMap, + Map> originalOpColumnExprMap, TableScanOperator input, ParseContext pGraphContext, + LinkedHashMap, OpParseContext> originalOpParseCtx) { + + ArrayList columnNames = new ArrayList(); + Map colExprMap = new HashMap(); + ArrayList col_list = new ArrayList(); + RowResolver out_rwsch = new RowResolver(); + boolean isSelectAll = false; + + int pos = 0; + for (ReduceSinkOperator rsop: rsops) { + Operator curr = correlation.getBottom2TSops().get(rsop).get(0).getChildOperators().get(0); + while(true) { + if (curr.getName().equals("SEL")) { + SelectOperator selOp = (SelectOperator)curr; + if (selOp.getColumnExprMap() != null) { + for (Entry entry: selOp.getColumnExprMap().entrySet()) { + ExprNodeDesc expr = entry.getValue(); + if (!isExisted(expr, col_list) && originalOpParseCtx.get(selOp).getRowResolver().getInvRslvMap().containsKey(entry.getKey())) { + col_list.add(expr); + String[] colRef = originalOpParseCtx.get(selOp).getRowResolver().getInvRslvMap().get(entry.getKey()); + String tabAlias = colRef[0]; + String colAlias = colRef[1]; + String outputName = entry.getKey(); + out_rwsch.put(tabAlias, colAlias, new ColumnInfo( + outputName, expr.getTypeInfo(), tabAlias, false)); + pos++; + columnNames.add(outputName); + colExprMap.put(outputName, expr); + } + } + } else { + for (ExprNodeDesc expr: selOp.getConf().getColList()) { + if (!isExisted(expr, col_list)) { + col_list.add(expr); + String[] colRef = pGraphContext.getOpParseCtx().get(selOp).getRowResolver().getInvRslvMap().get(expr.getCols().get(0)); + String tabAlias = colRef[0]; + String colAlias = colRef[1]; + String outputName = expr.getCols().get(0); + out_rwsch.put(tabAlias, colAlias, new ColumnInfo( + outputName, expr.getTypeInfo(), tabAlias, false)); + columnNames.add(outputName); + colExprMap.put(outputName, expr); + pos++; + } + } + } + break; + } else if (curr.getName().equals("FIL")) { + isSelectAll = true; + break; + } else if (curr.getName().equals("RS")) { + ReduceSinkOperator thisRSop = (ReduceSinkOperator)curr; + for (ExprNodeDesc expr: thisRSop.getConf().getKeyCols()) { + if (!isExisted(expr, col_list)) { + col_list.add(expr); + assert expr.getCols().size() == 1; + String columnName = getColumnName(originalOpColumnExprMap.get(thisRSop), expr); + String[] colRef = pGraphContext.getOpParseCtx().get(thisRSop).getRowResolver().getInvRslvMap().get(columnName); + String tabAlias = colRef[0]; + String colAlias = colRef[1]; + String outputName = expr.getCols().get(0); + out_rwsch.put(tabAlias, colAlias, new ColumnInfo( + outputName, expr.getTypeInfo(), tabAlias, false)); + columnNames.add(outputName); + colExprMap.put(outputName, expr); + pos++; + } + } + for (ExprNodeDesc expr: thisRSop.getConf().getValueCols()) { + if (!isExisted(expr, col_list)) { + col_list.add(expr); + assert expr.getCols().size() == 1; + String columnName = getColumnName(originalOpColumnExprMap.get(thisRSop), expr); + String[] colRef = pGraphContext.getOpParseCtx().get(thisRSop).getRowResolver().getInvRslvMap().get(columnName); + String tabAlias = colRef[0]; + String colAlias = colRef[1]; + String outputName = expr.getCols().get(0); + out_rwsch.put(tabAlias, colAlias, new ColumnInfo( + outputName, expr.getTypeInfo(), tabAlias, false)); + columnNames.add(outputName); + colExprMap.put(outputName, expr); + pos++; + } + } + + break; + } else { + curr = curr.getChildOperators().get(0); + } + } + } + + Operator output; + if (isSelectAll) { + output = input; + } else { + output = putOpInsertMap(OperatorFactory.getAndMakeChild( + new SelectDesc(col_list, columnNames, false), new RowSchema( + out_rwsch.getColumnInfos()), input), out_rwsch, pGraphContext.getOpParseCtx()); + output.setColumnExprMap(colExprMap); + output.setChildOperators(Utilities.makeList()); + + } + + return output; + } + + + public static Operator putOpInsertMap(Operator op, + RowResolver rr, LinkedHashMap, OpParseContext> opParseCtx) { + OpParseContext ctx = new OpParseContext(rr); + opParseCtx.put(op, ctx); + op.augmentPlan(); + return op; + } + + public static HashMap, String> getAliasIDtTopOps(HashMap> topOps) { + HashMap, String> aliasIDtTopOps = new HashMap, String>(); + for (Entry> entry: topOps.entrySet()) { + assert !aliasIDtTopOps.containsKey(entry.getValue()); + aliasIDtTopOps.put(entry.getValue(), entry.getKey()); + } + return aliasIDtTopOps; + } + + public static ArrayList findPeerReduceSinkOperators(ReduceSinkOperator op) { + + ArrayList peerReduceSinkOperators = new ArrayList(); + + List> children = op.getChildOperators(); + assert children.size() == 1; + + for (Operator parent: children.get(0).getParentOperators()) { + assert (parent instanceof ReduceSinkOperator); + peerReduceSinkOperators.add((ReduceSinkOperator)parent); + } + + return peerReduceSinkOperators; + } + + public static ArrayList findPeerFakeReduceSinkOperators(CorrelationLocalSimulativeReduceSinkOperator op) { + + ArrayList peerReduceSinkOperators = new ArrayList(); + + List> children = op.getChildOperators(); + assert children.size() == 1; + + for (Operator parent: children.get(0).getParentOperators()) { + assert (parent instanceof ReduceSinkOperator); + peerReduceSinkOperators.add((CorrelationLocalSimulativeReduceSinkOperator)parent); + } + + return peerReduceSinkOperators; + } + + // find how many layer's of Fake reduce sink + public static int getPostComputationDepth(IntraQueryCorrelation correlation) { + int depth = 0; + for (ReduceSinkOperator rsop: correlation.getBottomReduceSinkOperators()) { + ReduceSinkOperator op = rsop; + int layer = 0; + while(!correlation.getTopReduceSinkOperators().contains(op)) { + assert correlation.getDown2upRSops().get(op).size() == 1; + op = correlation.getDown2upRSops().get(op).get(0); + layer++; + } + if (layer > depth) { + depth = layer; + } + } + assert depth >= 1; + return depth; + } + + + public static int getPostComputationDepthOfThisPlan(ReduceSinkOperator rsop, IntraQueryCorrelation correlation) { + int depth = 0; + ReduceSinkOperator op = rsop; + while(!correlation.getTopReduceSinkOperators().contains(op)) { + assert correlation.getDown2upRSops().get(op).size() == 1; + op = correlation.getDown2upRSops().get(op).get(0); + depth++; + } + assert depth >= 1; + return depth; + } + + public static ParseContext applyCorrelation(IntraQueryCorrelation correlation, ParseContext inputpGraphContext, + LinkedHashMap, Map> originalOpColumnExprMap, + LinkedHashMap, RowResolver> originalOpRowResolver, + Map groupbyRegular2MapSide, + LinkedHashMap, OpParseContext> originalOpParseCtx) throws SemanticException { + + ParseContext pGraphContext = inputpGraphContext; + + // 0: if necessary, replace RS-GBY to GBY-RS-GBY. In GBY-RS-GBY, the first GBY is in type of hash, so it can group records + LOG.info("apply correlation step 0: replace RS-GBY to GBY-RS-GBY"); + for (ReduceSinkOperator rsop: correlation.getRSGBYToBeReplacedByGBYRSGBY()) { + LOG.info("operator " + rsop.getIdentifier() + " should be replaced"); + assert !correlation.getBottomReduceSinkOperators().contains(rsop); + GroupByOperator mapSideGBY = groupbyRegular2MapSide.get(rsop); + assert (mapSideGBY.getChildOperators().get(0).getChildOperators().get(0) instanceof GroupByOperator); + ReduceSinkOperator newRsop = (ReduceSinkOperator)mapSideGBY.getChildOperators().get(0); + GroupByOperator reduceSideGBY = (GroupByOperator)newRsop.getChildOperators().get(0); + GroupByOperator oldReduceSideGBY = (GroupByOperator)rsop.getChildOperators().get(0); + List> parents = rsop.getParentOperators(); + List> children = oldReduceSideGBY.getChildOperators(); + mapSideGBY.setParentOperators(parents); + for (Operator parent: parents) { + parent.replaceChild(rsop, mapSideGBY); + } + reduceSideGBY.setChildOperators(children); + for (Operator child: children) { + child.replaceParent(oldReduceSideGBY, reduceSideGBY); + } + correlation.getAllReduceSinkOperators().remove(rsop); + correlation.getAllReduceSinkOperators().add(newRsop); + } + + + Operator curr; + + // 1: Create table scan operator + LOG.info("apply correlation step 1: create table scan operator"); + HashMap oldTSOP2newTSOP = new HashMap(); + HashMap> oldTopOps = pGraphContext.getTopOps(); + HashMap, String> oldAliasIDtTopOps = getAliasIDtTopOps(oldTopOps); + HashMap oldTopToTable = pGraphContext.getTopToTable(); + HashMap> addedTopOps = new HashMap>(); + HashMap addedTopToTable = new HashMap(); + for (Entry> entry: correlation.getTable2CorrelatedTSops().entrySet()) { + TableScanOperator oldTSop = entry.getValue().get(0); + TableScanDesc tsDesc = new TableScanDesc(oldTSop.getConf().getAlias(), oldTSop.getConf().getVirtualCols()); + tsDesc.setForwardRowNumber(true); + OpParseContext opParseCtx= pGraphContext.getOpParseCtx().get(oldTSop); + Operator top = putOpInsertMap(OperatorFactory.get(tsDesc, + new RowSchema(opParseCtx.getRowResolver().getColumnInfos())), + opParseCtx.getRowResolver(), pGraphContext.getOpParseCtx()); + top.setParentOperators(null); + top.setChildOperators(Utilities.makeList()); + for (TableScanOperator tsop: entry.getValue()) { + addedTopOps.put(oldAliasIDtTopOps.get(tsop), top); + addedTopToTable.put((TableScanOperator) top, oldTopToTable.get(tsop)); + oldTSOP2newTSOP.put(tsop, (TableScanOperator)top); + } + } + + int postComputationDepth = getPostComputationDepth(correlation); + ArrayList> childrenOfDispatch = new ArrayList>(); + for (ReduceSinkOperator rsop: correlation.getBottomReduceSinkOperators()) { + int thisPostComputationDepth = getPostComputationDepthOfThisPlan(rsop, correlation); + // TODO: currently, correlation optimizer can not handle the case that + // a table is directly connected to a post computation operator. + assert correlation.getBottomReduceSinkOperators().containsAll(findPeerReduceSinkOperators(rsop)); + if (!correlation.getBottomReduceSinkOperators().containsAll(findPeerReduceSinkOperators(rsop))) { + throw new SemanticException("orrelation optimizer can not handle the case that a table is directly connected to a post computation operator"); + } + Operator op = rsop.getChildOperators().get(0); + if (!childrenOfDispatch.contains(op)) { + LOG.info("Add :" + op.getIdentifier() + " " + op.getName() + " to the children list of dispatch operator"); + childrenOfDispatch.add(op); + } + } + + int opTag = 0; + HashMap operationPath2CorrelationReduceSinkOps = new HashMap(); + for (Entry> entry: correlation.getTable2CorrelatedRSops().entrySet()) { + + // 2: Create select operator for shared op plans + LOG.info("apply correlation step 2: create select operator for shared operation path for the table of " + entry.getKey()); + curr = unionUsedColumnsAndMakeNewSelect(entry.getValue(), correlation, originalOpColumnExprMap, + oldTSOP2newTSOP.get(correlation.getBottom2TSops().get(entry.getValue().get(0)).get(0)), pGraphContext, + originalOpParseCtx); + + // 3: Create CorrelationCompositeOperator, CorrelationReduceSinkOperator + LOG.info("apply correlation step 3: create correlation composite Operator and correlation reduce sink operator for the table of " + entry.getKey()); + curr = createCorrelationCompositeReducesinkOperaotr( + correlation.getTable2CorrelatedTSops().get(entry.getKey()), entry.getValue(), correlation, curr, pGraphContext, + childrenOfDispatch, entry.getKey(), originalOpColumnExprMap, opTag, originalOpRowResolver); + + operationPath2CorrelationReduceSinkOps.put(new Integer(opTag), (ReduceSinkOperator)curr); + opTag++; + } + + + // 4: Create correlation dispatch operator for operation paths + LOG.info("apply correlation step 4: create correlation dispatch operator for operation paths"); + RowResolver outputRS = new RowResolver(); + List> correlationReduceSinkOps = new ArrayList>(); + for (Entry entry: operationPath2CorrelationReduceSinkOps.entrySet()) { + Integer opTagInteger = entry.getKey(); + curr = entry.getValue(); + correlationReduceSinkOps.add((ReduceSinkOperator)curr); + RowResolver inputRS = pGraphContext.getOpParseCtx().get(curr).getRowResolver(); + for (Entry> e1: inputRS.getRslvMap().entrySet()) { + for (Entry e2: e1.getValue().entrySet()) { + outputRS.put(e1.getKey(), e2.getKey(), e2.getValue()); + } + } + } + + Operator dispatchOp = putOpInsertMap(OperatorFactory.get( + new CorrelationReducerDispatchDesc(correlation.getDispatchConf(), correlation.getDispatchKeySelectDescConf(), correlation.getDispatchValueSelectDescConf()), + new RowSchema(outputRS.getColumnInfos())), + outputRS, pGraphContext.getOpParseCtx()); + + dispatchOp.setParentOperators(correlationReduceSinkOps); + for (Operator thisOp: correlationReduceSinkOps) { + thisOp.setChildOperators(Utilities.makeList(dispatchOp)); + } + + // 5: Replace the old plan in the original plan tree with new plan + LOG.info("apply correlation step 5: Replace the old plan in the original plan tree with the new plan"); + HashSet> processed = new HashSet>(); + for (Operator op: childrenOfDispatch) { + ArrayList> parents = new ArrayList>(); + for (Operator oldParent: op.getParentOperators()) { + if (!correlation.getBottomReduceSinkOperators().contains(oldParent)) { + parents.add(oldParent); + } + } + parents.add(dispatchOp); + op.setParentOperators(parents); + } + dispatchOp.setChildOperators(childrenOfDispatch); + HashMap> newTopOps = new HashMap>(); + for (Entry> entry: oldTopOps.entrySet()) { + if (addedTopOps.containsKey(entry.getKey())) { + newTopOps.put(entry.getKey(), addedTopOps.get(entry.getKey())); + } else { + newTopOps.put(entry.getKey(), entry.getValue()); + } + } + pGraphContext.setTopOps(newTopOps); + HashMap newTopToTable = new HashMap(); + for (Entry entry: oldTopToTable.entrySet()) { + if (addedTopToTable.containsKey(oldTSOP2newTSOP.get(entry.getKey()))) { + newTopToTable.put(oldTSOP2newTSOP.get(entry.getKey()), + addedTopToTable.get(oldTSOP2newTSOP.get(entry.getKey()))); + } else { + newTopToTable.put(entry.getKey(), entry.getValue()); + } + } + pGraphContext.setTopToTable(newTopToTable); + + // 6: Change each JFC related ReduceSinkOperator to a CorrelationFakeReduceSinkOperator + LOG.info("apply correlation step 6: Change each JFC related reduce sink operator to a correlation fake reduce sink operator"); + HashMap, ArrayList>> newParentsOfChildren = + new HashMap, ArrayList>>(); + for (ReduceSinkOperator rsop: correlation.getAllReduceSinkOperators()) { + if (!correlation.getBottomReduceSinkOperators().contains(rsop)) { + Operator childOP = rsop.getChildOperators().get(0); + Operator parentOP = rsop.getParentOperators().get(0); + Operator correlationFakeReduceSinkOperator = putOpInsertMap(OperatorFactory.get( + new CorrelationLocalSimulativeReduceSinkDesc(rsop.getConf()), + new RowSchema(pGraphContext.getOpParseCtx().get(rsop).getRowResolver().getColumnInfos())), + pGraphContext.getOpParseCtx().get(rsop).getRowResolver(), pGraphContext.getOpParseCtx()); + correlationFakeReduceSinkOperator.setChildOperators(Utilities.makeList(childOP)); + correlationFakeReduceSinkOperator.setParentOperators(Utilities.makeList(parentOP)); + parentOP.getChildOperators().set(parentOP.getChildOperators().indexOf(rsop), correlationFakeReduceSinkOperator); + childOP.getParentOperators().set(childOP.getParentOperators().indexOf(rsop), correlationFakeReduceSinkOperator); + } + } + + return pGraphContext; + } + + public static Operator createCorrelationCompositeReducesinkOperaotr( + ArrayList tsops, ArrayList rsops, + IntraQueryCorrelation correlation, + Operator input, ParseContext pGraphContext, + ArrayList> childrenOfDispatch, String tableName, + LinkedHashMap, Map> originalOpColumnExprMap, int newTag, + LinkedHashMap, RowResolver> originalOpRowResolver) throws SemanticException { + + // Create CorrelationCompositeOperator + RowResolver inputRR = pGraphContext.getOpParseCtx().get(input).getRowResolver(); + ArrayList> tops = new ArrayList>(); + ArrayList> bottoms = new ArrayList>(); + ArrayList opTags = new ArrayList(); + + for (ReduceSinkOperator rsop: rsops) { + TableScanOperator tsop = correlation.getBottom2TSops().get(rsop).get(0); + Operator curr = tsop.getChildOperators().get(0); + if (curr == rsop) { + // no filter needed, just forward + ForwardDesc forwardCtx = new ForwardDesc(); + Operator forwardOp = OperatorFactory.get(ForwardDesc.class); + forwardOp.setConf(forwardCtx); + tops.add(forwardOp); + bottoms.add(forwardOp); + opTags.add(correlation.getBottomReduceSink2OperationPathMap().get(rsop)); + } else { + // Add filter operator + FilterOperator currFilOp = null; + while(curr != rsop) { + if (curr.getName().equals("FIL")) { + FilterOperator fil = (FilterOperator)curr; + FilterDesc filterCtx = new FilterDesc(fil.getConf().getPredicate(), false); + Operator nowFilOp = OperatorFactory.get(FilterDesc.class); + nowFilOp.setConf(filterCtx); + if (currFilOp == null) { + currFilOp = (FilterOperator)nowFilOp; + tops.add(currFilOp); + } else { + nowFilOp.setParentOperators(Utilities.makeList(currFilOp)); + currFilOp.setChildOperators(Utilities.makeList(nowFilOp)); + currFilOp = (FilterOperator) nowFilOp; + } + } + curr = curr.getChildOperators().get(0); + } + if (currFilOp == null) { + ForwardDesc forwardCtx = new ForwardDesc(); + Operator forwardOp = OperatorFactory.get(ForwardDesc.class); + forwardOp.setConf(forwardCtx); + tops.add(forwardOp); + bottoms.add(forwardOp); + } else { + bottoms.add(currFilOp); + } + opTags.add(correlation.getBottomReduceSink2OperationPathMap().get(rsop)); + + } + } + + int[] opTagsArray = new int[opTags.size()]; + for (int i=0; i op : bottoms) { + op.setParentOperators(Utilities.makeList(input)); + } + input.setChildOperators(bottoms); + + CorrelationCompositeDesc ycoCtx = new CorrelationCompositeDesc(); + ycoCtx.setAllOperationPathTags(opTagsArray); + + Operator ycop = putOpInsertMap(OperatorFactory.get(ycoCtx, + new RowSchema(inputRR.getColumnInfos())), + inputRR, pGraphContext.getOpParseCtx()); + ycop.setParentOperators(tops); + for (Operator op : tops) { + op.setChildOperators(Utilities.makeList(ycop)); + } + + // Create CorrelationReduceSinkOperator + ArrayList partitionCols = new ArrayList(); + ArrayList keyCols = new ArrayList(); + Map colExprMap = new HashMap(); + ArrayList keyOutputColumnNames = new ArrayList(); + ReduceSinkOperator firstRsop = rsops.get(0); + + RowResolver firstRsopRS = pGraphContext.getOpParseCtx().get(firstRsop).getRowResolver(); + RowResolver orginalFirstRsopRS = originalOpRowResolver.get(firstRsop); + RowResolver outputRS = new RowResolver(); + HashMap keyCol2ExprForDispatch = new HashMap(); + HashMap valueCol2ExprForDispatch = new HashMap(); + + for (ExprNodeDesc expr: firstRsop.getConf().getKeyCols()) { + assert expr instanceof ExprNodeColumnDesc; + ExprNodeColumnDesc encd = (ExprNodeColumnDesc)expr; + String ouputName = getColumnName(originalOpColumnExprMap.get(firstRsop), expr); + ColumnInfo cinfo = orginalFirstRsopRS.getColumnInfos().get(orginalFirstRsopRS.getPosition(ouputName)); + + String col = SemanticAnalyzer.getColumnInternalName(keyCols.size()); + keyOutputColumnNames.add(col); + ColumnInfo newColInfo = new ColumnInfo(col, cinfo.getType(), tableName, cinfo + .getIsVirtualCol(), cinfo.isHiddenVirtualCol()); + + colExprMap.put(newColInfo.getInternalName(), expr); + + outputRS.put(tableName, newColInfo.getInternalName(), newColInfo); + keyCols.add(expr); + + keyCol2ExprForDispatch.put(encd.getColumn(), new ExprNodeColumnDesc(cinfo.getType(), col, tableName, + encd.getIsPartitionColOrVirtualCol())); + + } + + ArrayList valueCols = new ArrayList(); + ArrayList valueOutputColumnNames = new ArrayList(); + + correlation.addOperationPathToDispatchConf(newTag); + correlation.addOperationPathToDispatchKeySelectDescConf(newTag); + correlation.addOperationPathToDispatchValueSelectDescConf(newTag); + + + for (ReduceSinkOperator rsop: rsops) { + LOG.debug("Analyzing ReduceSinkOperator " + rsop.getIdentifier()); + RowResolver rs = pGraphContext.getOpParseCtx().get(rsop).getRowResolver(); + RowResolver orginalRS = originalOpRowResolver.get(rsop); + Integer childOpIndex = childrenOfDispatch.indexOf(rsop.getChildOperators().get(0)); + int outputTag = rsop.getConf().getTag(); + if (outputTag == -1) { + outputTag = 0; + } + if (!correlation.getDispatchConfForOperationPath(newTag).containsKey(childOpIndex)) { + correlation.getDispatchConfForOperationPath(newTag).put(childOpIndex, new ArrayList()); + } + correlation.getDispatchConfForOperationPath(newTag).get(childOpIndex).add(outputTag); + + ArrayList thisKeyColsInDispatch = new ArrayList(); + ArrayList outputKeyNamesInDispatch = new ArrayList(); + for (ExprNodeDesc expr: rsop.getConf().getKeyCols()) { + assert expr instanceof ExprNodeColumnDesc; + ExprNodeColumnDesc encd = (ExprNodeColumnDesc)expr; + String outputName = getColumnName(originalOpColumnExprMap.get(rsop), expr); + LOG.debug("key column: " + outputName); + thisKeyColsInDispatch.add(keyCol2ExprForDispatch.get(encd.getColumn())); + String[] names = outputName.split("\\."); + String outputKeyName = ""; + switch (names.length) { + case 1: outputKeyName = names[0]; break; + case 2: outputKeyName = names[1]; break; + default: throw(new SemanticException("found a un-sopported internal key name structure")); + } + outputKeyNamesInDispatch.add(outputKeyName); + } + + if (!correlation.getDispatchKeySelectDescConfForOperationPath(newTag).containsKey(childOpIndex)) { + correlation.getDispatchKeySelectDescConfForOperationPath(newTag).put(childOpIndex, new ArrayList()); + } + correlation.getDispatchKeySelectDescConfForOperationPath(newTag).get(childOpIndex). + add(new SelectDesc(thisKeyColsInDispatch, outputKeyNamesInDispatch, false)); + + ArrayList thisValueColsInDispatch = new ArrayList(); + ArrayList outputValueNamesInDispatch = new ArrayList(); + for (ExprNodeDesc expr: rsop.getConf().getValueCols()) { + + String outputName = getColumnName(originalOpColumnExprMap.get(rsop), expr); + LOG.debug("value column: " + outputName); + LOG.debug("originalOpColumnExprMap.get(rsop):" + originalOpColumnExprMap.get(rsop) + + " expr:" + expr.toString() + + " orginalRS.getColumnInfos().toString:" + orginalRS.getColumnInfos().toString() + " " + outputName ); + ColumnInfo cinfo = orginalRS.getColumnInfos().get(orginalRS.getPosition(outputName)); + if (!valueCol2ExprForDispatch.containsKey(expr.getExprString())) { + + String col = SemanticAnalyzer.getColumnInternalName(keyCols.size() + valueCols.size()); + valueOutputColumnNames.add(col); + ColumnInfo newColInfo = new ColumnInfo(col, cinfo.getType(), tableName, cinfo + .getIsVirtualCol(), cinfo.isHiddenVirtualCol()); + colExprMap.put(newColInfo.getInternalName(), expr); + outputRS.put(tableName, newColInfo.getInternalName(), newColInfo); + valueCols.add(expr); + + valueCol2ExprForDispatch.put(expr.getExprString(), new ExprNodeColumnDesc(cinfo.getType(), col, tableName, + false)); + } + + thisValueColsInDispatch.add(valueCol2ExprForDispatch.get(expr.getExprString())); + String[] names = outputName.split("\\."); + String outputValueName = ""; + switch (names.length) { + case 1: outputValueName = names[0]; break; + case 2: outputValueName = names[1]; break; + default: throw(new SemanticException("found a un-sopported internal value name structure")); + } + outputValueNamesInDispatch.add(outputValueName); + } + + if (!correlation.getDispatchValueSelectDescConfForOperationPath(newTag).containsKey(childOpIndex)) { + correlation.getDispatchValueSelectDescConfForOperationPath(newTag).put(childOpIndex, new ArrayList()); + } + correlation.getDispatchValueSelectDescConfForOperationPath(newTag).get(childOpIndex). + add(new SelectDesc(thisValueColsInDispatch, outputValueNamesInDispatch, false)); + } + + ReduceSinkOperator rsOp = null; + try { + rsOp = (ReduceSinkOperator) putOpInsertMap( + OperatorFactory.getAndMakeChild(getReduceSinkDesc(keyCols, + keyCols.size(), valueCols, new ArrayList>(), + keyOutputColumnNames, valueOutputColumnNames, true, newTag, keyCols.size(), + -1), new RowSchema(outputRS + .getColumnInfos()), ycop), outputRS, pGraphContext.getOpParseCtx()); + rsOp.setColumnExprMap(colExprMap); + ((CorrelationCompositeOperator)ycop).getConf().setCorrespondingReduceSinkOperator(rsOp); + } catch (SemanticException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + return rsOp; + } + + + /** + * Create the correlation reduce sink descriptor. + * + * @param keyCols + * The columns to be stored in the key + * @param numKeys number of distribution keys. Equals to group-by-key + * numbers usually. + * @param valueCols + * The columns to be stored in the value + * @param distinctColIndices + * column indices for distinct aggregates + * @param outputKeyColumnNames + * The output key columns names + * @param outputValueColumnNames + * The output value columns names + * @param tag + * The tag for this reducesink + * @param numPartitionFields + * The first numPartitionFields of keyCols will be partition columns. + * If numPartitionFields=-1, then partition randomly. + * @param numReducers + * The number of reducers, set to -1 for automatic inference based on + * input data size. + * @return The YSmartReduceSinkDesc object. + */ + public static ReduceSinkDesc getReduceSinkDesc( + ArrayList keyCols, int numKeys, + ArrayList valueCols, + List> distinctColIndices, + List outputKeyColumnNames, List outputValueColumnNames, + boolean includeKey, int tag, + int numPartitionFields, int numReducers) throws SemanticException { + ArrayList partitionCols = null; + + if (numPartitionFields >= keyCols.size()) { + partitionCols = keyCols; + } else if (numPartitionFields >= 0) { + partitionCols = new ArrayList(numPartitionFields); + for (int i = 0; i < numPartitionFields; i++) { + partitionCols.add(keyCols.get(i)); + } + } else { + // numPartitionFields = -1 means random partitioning + partitionCols = new ArrayList(1); + partitionCols.add(TypeCheckProcFactory.DefaultExprProcessor + .getFuncExprNodeDesc("rand")); + } + + StringBuilder order = new StringBuilder(); + for (int i = 0; i < keyCols.size(); i++) { + order.append("+"); + } + + TableDesc keyTable = null; + TableDesc valueTable = null; + ArrayList outputKeyCols = new ArrayList(); + ArrayList outputValCols = new ArrayList(); + if (includeKey) { + keyTable = PlanUtils.getReduceKeyTableDesc(PlanUtils.getFieldSchemasFromColumnListWithLength( + keyCols, distinctColIndices, outputKeyColumnNames, numKeys, ""), + order.toString()); + outputKeyCols.addAll(outputKeyColumnNames); + } else { + keyTable = PlanUtils.getReduceKeyTableDesc(PlanUtils.getFieldSchemasFromColumnList( + keyCols, "reducesinkkey"),order.toString()); + for (int i = 0; i < keyCols.size(); i++) { + outputKeyCols.add("reducesinkkey" + i); + } + } + valueTable = PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromColumnList( + valueCols, outputValueColumnNames, 0, "")); + outputValCols.addAll(outputValueColumnNames); + + return new ReduceSinkDesc(keyCols, numKeys, valueCols, outputKeyCols, + distinctColIndices, outputValCols, + tag, partitionCols, numReducers, keyTable, + valueTable, true); + } + +} Index: ql/src/test/results/compiler/plan/groupby2.q.xml =================================================================== --- ql/src/test/results/compiler/plan/groupby2.q.xml (revision 1237326) +++ ql/src/test/results/compiler/plan/groupby2.q.xml (working copy) @@ -177,6 +177,56 @@ + + VALUE._col1 + + + _col3 + + + + + + + + double + + + + + + + KEY._col1:0._col0 + + + _col1 + + + + + + + + + + + VALUE._col0 + + + _col2 + + + + + + + + bigint + + + + + @@ -198,17 +248,7 @@ - - - _col1 - - - - - - - - + @@ -280,38 +320,10 @@ - - - _col2 - - - - - - - - bigint - - - - + - - - _col3 - - - - - - - - double - - - - + @@ -402,7 +414,7 @@ VALUE._col0 - + @@ -412,7 +424,7 @@ VALUE._col1 - + @@ -781,7 +793,7 @@ - + @@ -794,7 +806,7 @@ - + @@ -821,7 +833,7 @@ src - + @@ -1044,7 +1056,7 @@ src - + @@ -1343,7 +1355,7 @@ - + @@ -1397,7 +1409,7 @@ - + @@ -1423,7 +1435,7 @@ _col1 - + _col1 @@ -1431,13 +1443,13 @@ - + _col0 - + _col0 @@ -1456,10 +1468,10 @@ - + - + @@ -1533,7 +1545,7 @@ _col1 - + @@ -1562,7 +1574,7 @@ _col0 - + KEY._col0 @@ -1630,7 +1642,7 @@ VALUE._col1 - + @@ -1646,7 +1658,7 @@ - + @@ -1725,7 +1737,7 @@ - + @@ -1738,7 +1750,7 @@ - + Index: ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationReducerDispatchDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationReducerDispatchDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationReducerDispatchDesc.java (revision 0) @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map.Entry; + + +/** + * Correlation dispatch operator Descriptor implementation. + * + */ +@Explain(displayName = "Correlation Dispatch Operator") +public class CorrelationReducerDispatchDesc implements Serializable { + + private static final long serialVersionUID = 1L; + + + private HashMap>> dispatchConf; + private HashMap>> dispatchValueSelectDescConf; + private HashMap>> dispatchKeySelectDescConf; + + public CorrelationReducerDispatchDesc(){ + this.dispatchConf = new HashMap>>(); + this.dispatchValueSelectDescConf = new HashMap>>(); + this.dispatchKeySelectDescConf = new HashMap>>(); + + } + + public CorrelationReducerDispatchDesc(HashMap>> dispatchConf){ + this.dispatchConf = dispatchConf; + this.dispatchValueSelectDescConf = new HashMap>>(); + this.dispatchKeySelectDescConf = new HashMap>>(); + for(Entry>> entry: this.dispatchConf.entrySet()){ + HashMap> tmp = new HashMap>(); + for(Integer child: entry.getValue().keySet()){ + tmp.put(child, new ArrayList()); + tmp.get(child).add(new SelectDesc(true)); + } + this.dispatchValueSelectDescConf.put(entry.getKey(), tmp); + this.dispatchKeySelectDescConf.put(entry.getKey(), tmp); + } + } + + public CorrelationReducerDispatchDesc(HashMap>> dispatchConf, + HashMap>> dispatchKeySelectDescConf, + HashMap>> dispatchValueSelectDescConf){ + this.dispatchConf = dispatchConf; + this.dispatchValueSelectDescConf = dispatchValueSelectDescConf; + this.dispatchKeySelectDescConf = dispatchKeySelectDescConf; + } + + public void setDispatchConf(HashMap>> dispatchConf){ + this.dispatchConf = dispatchConf; + } + + public HashMap>> getDispatchConf(){ + return this.dispatchConf; + } + + public void setDispatchValueSelectDescConf(HashMap>> dispatchValueSelectDescConf){ + this.dispatchValueSelectDescConf = dispatchValueSelectDescConf; + } + + public HashMap>> getDispatchValueSelectDescConf(){ + return this.dispatchValueSelectDescConf; + } + + public void setDispatchKeySelectDescConf(HashMap>> dispatchKeySelectDescConf){ + this.dispatchKeySelectDescConf = dispatchKeySelectDescConf; + } + + public HashMap>> getDispatchKeySelectDescConf() { + return this.dispatchKeySelectDescConf; + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (revision 1237326) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (working copy) @@ -72,6 +72,7 @@ private Long minSplitSizePerRack; private boolean needsTagging; + private boolean needsOperationPathTagging; private boolean hadoopSupportsSplittable; private MapredLocalWork mapLocalWork; @@ -338,6 +339,16 @@ this.needsTagging = needsTagging; } + // TODO: include "Needs Operation Paths Tagging: false" into correct results + // @Explain(displayName = "Needs Operation Paths Tagging", normalExplain = false) + public boolean getNeedsOperationPathTagging() { + return needsOperationPathTagging; + } + + public void setNeedsOperationPathTagging(boolean needsOperationPathTagging) { + this.needsOperationPathTagging = needsOperationPathTagging; + } + public boolean getHadoopSupportsSplittable() { return hadoopSupportsSplittable; } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationLocalSimulativeReduceSinkDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationLocalSimulativeReduceSinkDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationLocalSimulativeReduceSinkDesc.java (revision 0) @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.List; + +/** + * Correlation Fake ReduceSinkDesc. + * + */ +@Explain(displayName = "Correlation Local Simulative Reduce Output Operator") +public class CorrelationLocalSimulativeReduceSinkDesc extends BaseReduceSinkDesc implements Serializable { + private static final long serialVersionUID = 1L; + + public CorrelationLocalSimulativeReduceSinkDesc() { + } + + public CorrelationLocalSimulativeReduceSinkDesc(java.util.ArrayList keyCols, + int numDistributionKeys, + java.util.ArrayList valueCols, + java.util.ArrayList outputKeyColumnNames, + List> distinctColumnIndices, + java.util.ArrayList outputValueColumnNames, int tag, + java.util.ArrayList partitionCols, int numReducers, + final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) { + this.keyCols = keyCols; + this.numDistributionKeys = numDistributionKeys; + this.valueCols = valueCols; + this.outputKeyColumnNames = outputKeyColumnNames; + this.outputValueColumnNames = outputValueColumnNames; + this.tag = tag; + this.numReducers = numReducers; + this.partitionCols = partitionCols; + this.keySerializeInfo = keySerializeInfo; + this.valueSerializeInfo = valueSerializeInfo; + this.distinctColumnIndices = distinctColumnIndices; + } + + public CorrelationLocalSimulativeReduceSinkDesc(ReduceSinkDesc reduceSinkDesc){ + this.keyCols = reduceSinkDesc.getKeyCols(); + this.numDistributionKeys = reduceSinkDesc.getNumDistributionKeys(); + this.valueCols = reduceSinkDesc.getValueCols(); + this.outputKeyColumnNames = reduceSinkDesc.getOutputKeyColumnNames(); + this.outputValueColumnNames = reduceSinkDesc.getOutputValueColumnNames(); + this.tag = reduceSinkDesc.getTag(); + this.numReducers = reduceSinkDesc.getNumReducers(); + this.partitionCols = reduceSinkDesc.getPartitionCols(); + this.keySerializeInfo = reduceSinkDesc.getKeySerializeInfo(); + this.valueSerializeInfo = reduceSinkDesc.getValueSerializeInfo(); + this.distinctColumnIndices = reduceSinkDesc.getDistinctColumnIndices(); + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (revision 1237326) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.OutputCollector; @@ -512,7 +513,7 @@ LOG.debug("End group Done"); } - protected boolean allInitializedParentsAreClosed() { + public boolean allInitializedParentsAreClosed() { if (parentOperators != null) { for (Operator parent : parentOperators) { if(parent==null){ @@ -1332,4 +1333,52 @@ public void cleanUpInputFileChangedOp() throws HiveException { } + // bytesWritableGroupKey is only used when a query plan is optimized by CorrelationOptimizer. + // CorrelationFakeReduceSinkOperator will use this variable to determine when it needs to start or end the group + // for its child operator. + protected BytesWritable bytesWritableGroupKey; + + public void setBytesWritableGroupKey(BytesWritable groupKey) { + if (bytesWritableGroupKey == null) { + bytesWritableGroupKey = new BytesWritable(); + } + bytesWritableGroupKey.set(groupKey.get(), 0, groupKey.getSize()); + } + + public BytesWritable getBytesWritableGroupKey() { + return bytesWritableGroupKey; + } + + // The number of current row + protected long rowNumber; + + public void initializeRowNumber() { + this.rowNumber = 0L; + LOG.info("Operator " + id + " " + getName() + " row number initialized to 0"); + if (childOperators == null) { + return; + } + LOG.info("Initializing row numbers of children of " + id + " " + getName()); + for (int i = 0; i < childOperatorsArray.length; i++) { + childOperatorsArray[i].initializeRowNumber(); + } + } + + public void setRowNumber(long rowNumber) { + this.rowNumber = rowNumber; + if (childOperators == null) { + return; + } + for (int i = 0; i < childOperatorsArray.length; i++) { + assert rowNumber >= childOperatorsArray[i].getRowNumber(); + if (rowNumber != childOperatorsArray[i].getRowNumber()) { + childOperatorsArray[i].setRowNumber(rowNumber); + } + } + } + + public long getRowNumber() { + return rowNumber; + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1237326) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -186,7 +186,7 @@ private List loadTableWork; private List loadFileWork; private Map joinContext; - private final HashMap topToTable; + private HashMap topToTable; private QB qb; private ASTNode ast; private int destTableId; @@ -207,6 +207,9 @@ private final UnparseTranslator unparseTranslator; private final GlobalLimitCtx globalLimitCtx = new GlobalLimitCtx(); + // store the variable ParseCOntext.groupbyRegular2MapSide + Map groupbyRegular2MapSide; + //prefix for column names auto generated by hive private final String autogenColAliasPrfxLbl; private final boolean autogenColAliasPrfxIncludeFuncName; @@ -287,6 +290,7 @@ autogenColAliasPrfxIncludeFuncName = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTOGEN_COLUMNALIAS_PREFIX_INCLUDEFUNCNAME); queryProperties = new QueryProperties(); + groupbyRegular2MapSide = new HashMap(); } @Override @@ -305,6 +309,8 @@ opParseCtx.clear(); groupOpToInputTables.clear(); prunedPartitions.clear(); + topToTable.clear(); + groupbyRegular2MapSide.clear(); } public void init(ParseContext pctx) { @@ -312,6 +318,7 @@ opToPartList = pctx.getOpToPartList(); opToSamplePruner = pctx.getOpToSamplePruner(); topOps = pctx.getTopOps(); + topToTable = pctx.getTopToTable(); topSelOps = pctx.getTopSelOps(); opParseCtx = pctx.getOpParseCtx(); loadTableWork = pctx.getLoadTableWork(); @@ -326,6 +333,7 @@ groupOpToInputTables = pctx.getGroupOpToInputTables(); prunedPartitions = pctx.getPrunedPartitions(); setLineageInfo(pctx.getLineageInfo()); + groupbyRegular2MapSide = pctx.getGroupbyRegular2MapSide(); } public ParseContext getParseContext() { @@ -333,7 +341,8 @@ topSelOps, opParseCtx, joinContext, topToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, - opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks); + opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, + groupbyRegular2MapSide); } @SuppressWarnings("nls") @@ -2904,7 +2913,7 @@ colExprMap); List> distinctColIndices = getDistinctColIndicesForReduceSink(parseInfo, dest, - reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames); + reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames, colExprMap); ArrayList reduceValues = new ArrayList(); HashMap aggregationTrees = parseInfo @@ -2912,7 +2921,7 @@ if (!mapAggrDone) { getReduceValuesForReduceSinkNoMapAgg(parseInfo, dest, reduceSinkInputRowResolver, - reduceSinkOutputRowResolver, outputValueColumnNames, reduceValues); + reduceSinkOutputRowResolver, outputValueColumnNames, reduceValues, colExprMap); } else { // Put partial aggregation results in reduceValues int inputField = reduceKeys.size(); @@ -2921,14 +2930,16 @@ TypeInfo type = reduceSinkInputRowResolver.getColumnInfos().get( inputField).getType(); - reduceValues.add(new ExprNodeColumnDesc(type, - getColumnInternalName(inputField), "", false)); + ExprNodeDesc expr = new ExprNodeColumnDesc(type, + getColumnInternalName(inputField), "", false); + reduceValues.add(expr); inputField++; outputValueColumnNames.add(getColumnInternalName(reduceValues.size() - 1)); String field = Utilities.ReduceField.VALUE.toString() + "." + getColumnInternalName(reduceValues.size() - 1); - reduceSinkOutputRowResolver.putExpression(entry.getValue(), - new ColumnInfo(field, type, null, false)); + ColumnInfo colInfo = new ColumnInfo(field, type, null, false); + reduceSinkOutputRowResolver.putExpression(entry.getValue(), colInfo); + colExprMap.put(colInfo.getInternalName(),expr); } } @@ -2973,7 +2984,8 @@ private List> getDistinctColIndicesForReduceSink(QBParseInfo parseInfo, String dest, ArrayList reduceKeys, RowResolver reduceSinkInputRowResolver, - RowResolver reduceSinkOutputRowResolver, List outputKeyColumnNames) + RowResolver reduceSinkOutputRowResolver, List outputKeyColumnNames, + Map colExprMap) throws SemanticException { List> distinctColIndices = new ArrayList>(); @@ -3012,6 +3024,7 @@ ColumnInfo colInfo = new ColumnInfo(field, expr.getTypeInfo(), null, false); reduceSinkOutputRowResolver.putExpression(parameter, colInfo); numExprs++; + colExprMap.put(colInfo.getInternalName(), expr); } distinctColIndices.add(distinctIndices); } @@ -3022,7 +3035,8 @@ private void getReduceValuesForReduceSinkNoMapAgg(QBParseInfo parseInfo, String dest, RowResolver reduceSinkInputRowResolver, RowResolver reduceSinkOutputRowResolver, - List outputValueColumnNames, ArrayList reduceValues) + List outputValueColumnNames, ArrayList reduceValues, + Map colExprMap) throws SemanticException { HashMap aggregationTrees = parseInfo .getAggregationExprsForClause(dest); @@ -3034,15 +3048,16 @@ for (int i = 1; i < value.getChildCount(); i++) { ASTNode parameter = (ASTNode) value.getChild(i); if (reduceSinkOutputRowResolver.getExpression(parameter) == null) { - reduceValues.add(genExprNodeDesc(parameter, - reduceSinkInputRowResolver)); + ExprNodeDesc expr = genExprNodeDesc(parameter, reduceSinkInputRowResolver); + reduceValues.add(expr); outputValueColumnNames .add(getColumnInternalName(reduceValues.size() - 1)); String field = Utilities.ReduceField.VALUE.toString() + "." + getColumnInternalName(reduceValues.size() - 1); - reduceSinkOutputRowResolver.putExpression(parameter, new ColumnInfo(field, - reduceValues.get(reduceValues.size() - 1).getTypeInfo(), null, - false)); + ColumnInfo colInfo = new ColumnInfo(field, + reduceValues.get(reduceValues.size() - 1).getTypeInfo(), null, false); + reduceSinkOutputRowResolver.putExpression(parameter, colInfo); + colExprMap.put(colInfo.getInternalName(),expr); } } } @@ -3073,7 +3088,7 @@ colExprMap); List> distinctColIndices = getDistinctColIndicesForReduceSink(parseInfo, dest, - reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames); + reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames, colExprMap); ArrayList reduceValues = new ArrayList(); @@ -3082,7 +3097,7 @@ for (String destination : dests) { getReduceValuesForReduceSinkNoMapAgg(parseInfo, destination, reduceSinkInputRowResolver, - reduceSinkOutputRowResolver, outputValueColumnNames, reduceValues); + reduceSinkOutputRowResolver, outputValueColumnNames, reduceValues, colExprMap); // Need to pass all of the columns used in the where clauses as reduce values ASTNode whereClause = parseInfo.getWhrForClause(destination); @@ -3092,15 +3107,18 @@ for (int i = 0; i < columnExprs.size(); i++) { ASTNode parameter = columnExprs.get(i); if (reduceSinkOutputRowResolver.getExpression(parameter) == null) { - reduceValues.add(genExprNodeDesc(parameter, - reduceSinkInputRowResolver)); + ExprNodeDesc expr = genExprNodeDesc(parameter, + reduceSinkInputRowResolver); + reduceValues.add(expr); outputValueColumnNames .add(getColumnInternalName(reduceValues.size() - 1)); String field = Utilities.ReduceField.VALUE.toString() + "." + getColumnInternalName(reduceValues.size() - 1); - reduceSinkOutputRowResolver.putExpression(parameter, new ColumnInfo(field, + ColumnInfo colInfo = new ColumnInfo(field, reduceValues.get(reduceValues.size() - 1).getTypeInfo(), null, - false)); + false); + reduceSinkOutputRowResolver.putExpression(parameter, colInfo); + colExprMap.put(colInfo.getInternalName(),expr); } } } @@ -3197,13 +3215,16 @@ ASTNode t = entry.getValue(); TypeInfo typeInfo = reduceSinkInputRowResolver2.getExpression(t) .getType(); - reduceValues.add(new ExprNodeColumnDesc(typeInfo, field, "", false)); + ExprNodeColumnDesc inputExpr = new ExprNodeColumnDesc(typeInfo, field, + "", false); + reduceValues.add(inputExpr); inputField++; String col = getColumnInternalName(reduceValues.size() - 1); outputColumnNames.add(col); - reduceSinkOutputRowResolver2.putExpression(t, new ColumnInfo( - Utilities.ReduceField.VALUE.toString() + "." + col, typeInfo, "", - false)); + ColumnInfo colInfo = new ColumnInfo(Utilities.ReduceField.VALUE.toString() + + "." + col, typeInfo, "", false); + reduceSinkOutputRowResolver2.putExpression(t, colInfo); + colExprMap.put(colInfo.getInternalName(), inputExpr); } ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap( @@ -5819,6 +5840,7 @@ reduceValues.size() - 1).getTypeInfo(), "", false); reduceSinkOutputRowResolver.putExpression(grpbyExpr, colInfo); outputColumnNames.add(getColumnInternalName(reduceValues.size() - 1)); + colExprMap.put(colInfo.getInternalName(), grpByExprNode); } } @@ -5845,6 +5867,7 @@ reduceSinkOutputRowResolver.putExpression(paraExpr, colInfo); outputColumnNames .add(getColumnInternalName(reduceValues.size() - 1)); + colExprMap.put(colInfo.getInternalName(), paraExprNode); } } } @@ -6081,7 +6104,8 @@ // insert a select operator here used by the ColumnPruner to reduce // the data to shuffle curr = insertSelectAllPlanForGroupBy(curr); - if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) { + if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE) && + !conf.getBoolVar(HiveConf.ConfVars.HIVEOPTCORRELATION)) { if (!conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) { curr = genGroupByPlanMapAggr1MR(dest, qb, curr); } else { @@ -6090,7 +6114,19 @@ } else if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) { curr = genGroupByPlan2MR(dest, qb, curr); } else { + Operator mapSide = null; + if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE) && + conf.getBoolVar(HiveConf.ConfVars.HIVEOPTCORRELATION)) { + mapSide = genGroupByPlanMapAggr1MR(dest, qb, curr); + mapSide = (Operator)((Operator)mapSide.getParentOperators().get(0)).getParentOperators().get(0); + curr.getChildOperators().remove(mapSide); + } curr = genGroupByPlan1MR(dest, qb, curr); + if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE) && + conf.getBoolVar(HiveConf.ConfVars.HIVEOPTCORRELATION)) { + groupbyRegular2MapSide.put((ReduceSinkOperator )curr.getParentOperators().get(0), + (GroupByOperator)mapSide); + } } } @@ -7500,7 +7536,8 @@ opToPartList, topOps, topSelOps, opParseCtx, joinContext, topToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, - opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks); + opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, + groupbyRegular2MapSide); Optimizer optm = new Optimizer(); optm.setPctx(pCtx); Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (revision 1237326) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (working copy) @@ -26,48 +26,20 @@ * */ @Explain(displayName = "Reduce Output Operator") -public class ReduceSinkDesc implements Serializable { +public class ReduceSinkDesc extends BaseReduceSinkDesc implements Serializable { private static final long serialVersionUID = 1L; - /** - * Key columns are passed to reducer in the "key". - */ - private java.util.ArrayList keyCols; - private java.util.ArrayList outputKeyColumnNames; - private List> distinctColumnIndices; - /** - * Value columns are passed to reducer in the "value". - */ - private java.util.ArrayList valueCols; - private java.util.ArrayList outputValueColumnNames; - /** - * Describe how to serialize the key. - */ - private TableDesc keySerializeInfo; - /** - * Describe how to serialize the value. - */ - private TableDesc valueSerializeInfo; - /** - * The tag for this reducesink descriptor. - */ - private int tag; + private boolean needsOperationPathTagging; + public boolean getNeedsOperationPathTagging() { + return needsOperationPathTagging; + } - /** - * Number of distribution keys. - */ - private int numDistributionKeys; + public void setNeedsOperationPathTagging(boolean needsOperationPathTagging) { + this.needsOperationPathTagging = needsOperationPathTagging; + } - /** - * The partition columns (CLUSTER BY or DISTRIBUTE BY in Hive language). - * Partition columns decide the reducer that the current row goes to. - * Partition columns are not passed to reducer. - */ - private java.util.ArrayList partitionCols; + public ReduceSinkDesc(){ - private int numReducers; - - public ReduceSinkDesc() { } public ReduceSinkDesc(java.util.ArrayList keyCols, @@ -78,6 +50,20 @@ java.util.ArrayList outputValueColumnNames, int tag, java.util.ArrayList partitionCols, int numReducers, final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) { + this(keyCols, numDistributionKeys, valueCols, + outputKeyColumnNames, distinctColumnIndices, outputValueColumnNames, tag, + partitionCols, numReducers, keySerializeInfo, valueSerializeInfo, false); + } + + public ReduceSinkDesc(java.util.ArrayList keyCols, + int numDistributionKeys, + java.util.ArrayList valueCols, + java.util.ArrayList outputKeyColumnNames, + List> distinctColumnIndices, + java.util.ArrayList outputValueColumnNames, int tag, + java.util.ArrayList partitionCols, int numReducers, + final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo, + boolean needsOperationPathTagging) { this.keyCols = keyCols; this.numDistributionKeys = numDistributionKeys; this.valueCols = valueCols; @@ -89,126 +75,7 @@ this.keySerializeInfo = keySerializeInfo; this.valueSerializeInfo = valueSerializeInfo; this.distinctColumnIndices = distinctColumnIndices; + this.needsOperationPathTagging = needsOperationPathTagging; } - public java.util.ArrayList getOutputKeyColumnNames() { - return outputKeyColumnNames; - } - - public void setOutputKeyColumnNames( - java.util.ArrayList outputKeyColumnNames) { - this.outputKeyColumnNames = outputKeyColumnNames; - } - - public java.util.ArrayList getOutputValueColumnNames() { - return outputValueColumnNames; - } - - public void setOutputValueColumnNames( - java.util.ArrayList outputValueColumnNames) { - this.outputValueColumnNames = outputValueColumnNames; - } - - @Explain(displayName = "key expressions") - public java.util.ArrayList getKeyCols() { - return keyCols; - } - - public void setKeyCols(final java.util.ArrayList keyCols) { - this.keyCols = keyCols; - } - - public int getNumDistributionKeys() { - return this.numDistributionKeys; - } - - public void setNumDistributionKeys(int numKeys) { - this.numDistributionKeys = numKeys; - } - - @Explain(displayName = "value expressions") - public java.util.ArrayList getValueCols() { - return valueCols; - } - - public void setValueCols(final java.util.ArrayList valueCols) { - this.valueCols = valueCols; - } - - @Explain(displayName = "Map-reduce partition columns") - public java.util.ArrayList getPartitionCols() { - return partitionCols; - } - - public void setPartitionCols( - final java.util.ArrayList partitionCols) { - this.partitionCols = partitionCols; - } - - @Explain(displayName = "tag") - public int getTag() { - return tag; - } - - public void setTag(int tag) { - this.tag = tag; - } - - /** - * Returns the number of reducers for the map-reduce job. -1 means to decide - * the number of reducers at runtime. This enables Hive to estimate the number - * of reducers based on the map-reduce input data size, which is only - * available right before we start the map-reduce job. - */ - public int getNumReducers() { - return numReducers; - } - - public void setNumReducers(int numReducers) { - this.numReducers = numReducers; - } - - public TableDesc getKeySerializeInfo() { - return keySerializeInfo; - } - - public void setKeySerializeInfo(TableDesc keySerializeInfo) { - this.keySerializeInfo = keySerializeInfo; - } - - public TableDesc getValueSerializeInfo() { - return valueSerializeInfo; - } - - public void setValueSerializeInfo(TableDesc valueSerializeInfo) { - this.valueSerializeInfo = valueSerializeInfo; - } - - /** - * Returns the sort order of the key columns. - * - * @return null, which means ascending order for all key columns, or a String - * of the same length as key columns, that consists of only "+" - * (ascending order) and "-" (descending order). - */ - @Explain(displayName = "sort order") - public String getOrder() { - return keySerializeInfo.getProperties().getProperty( - org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER); - } - - public void setOrder(String orderStr) { - keySerializeInfo.getProperties().setProperty( - org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER, - orderStr); - } - - public List> getDistinctColumnIndices() { - return distinctColumnIndices; - } - - public void setDistinctColumnIndices( - List> distinctColumnIndices) { - this.distinctColumnIndices = distinctColumnIndices; - } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/BaseReduceSinkDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/BaseReduceSinkDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/BaseReduceSinkDesc.java (revision 0) @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.List; + +/** + * ReduceSinkDesc. + * + */ +@Explain(displayName = "Base Reduce Output Operator") +public class BaseReduceSinkDesc implements Serializable { + private static final long serialVersionUID = 1L; + /** + * Key columns are passed to reducer in the "key". + */ + protected java.util.ArrayList keyCols; + protected java.util.ArrayList outputKeyColumnNames; + protected List> distinctColumnIndices; + /** + * Value columns are passed to reducer in the "value". + */ + protected java.util.ArrayList valueCols; + protected java.util.ArrayList outputValueColumnNames; + /** + * Describe how to serialize the key. + */ + protected TableDesc keySerializeInfo; + /** + * Describe how to serialize the value. + */ + protected TableDesc valueSerializeInfo; + + /** + * The tag for this reducesink descriptor. + */ + protected int tag; + + /** + * Number of distribution keys. + */ + protected int numDistributionKeys; + + /** + * The partition columns (CLUSTER BY or DISTRIBUTE BY in Hive language). + * Partition columns decide the reducer that the current row goes to. + * Partition columns are not passed to reducer. + */ + protected java.util.ArrayList partitionCols; + + protected int numReducers; + + public BaseReduceSinkDesc() { + } + + public java.util.ArrayList getOutputKeyColumnNames() { + return outputKeyColumnNames; + } + + public void setOutputKeyColumnNames( + java.util.ArrayList outputKeyColumnNames) { + this.outputKeyColumnNames = outputKeyColumnNames; + } + + public java.util.ArrayList getOutputValueColumnNames() { + return outputValueColumnNames; + } + + public void setOutputValueColumnNames( + java.util.ArrayList outputValueColumnNames) { + this.outputValueColumnNames = outputValueColumnNames; + } + + @Explain(displayName = "key expressions") + public java.util.ArrayList getKeyCols() { + return keyCols; + } + + public void setKeyCols(final java.util.ArrayList keyCols) { + this.keyCols = keyCols; + } + + public int getNumDistributionKeys() { + return this.numDistributionKeys; + } + + public void setNumDistributionKeys(int numKeys) { + this.numDistributionKeys = numKeys; + } + + @Explain(displayName = "value expressions") + public java.util.ArrayList getValueCols() { + return valueCols; + } + + public void setValueCols(final java.util.ArrayList valueCols) { + this.valueCols = valueCols; + } + + @Explain(displayName = "Map-reduce partition columns") + public java.util.ArrayList getPartitionCols() { + return partitionCols; + } + + public void setPartitionCols( + final java.util.ArrayList partitionCols) { + this.partitionCols = partitionCols; + } + + @Explain(displayName = "tag") + public int getTag() { + return tag; + } + + public void setTag(int tag) { + this.tag = tag; + } + + /** + * Returns the number of reducers for the map-reduce job. -1 means to decide + * the number of reducers at runtime. This enables Hive to estimate the number + * of reducers based on the map-reduce input data size, which is only + * available right before we start the map-reduce job. + */ + public int getNumReducers() { + return numReducers; + } + + public void setNumReducers(int numReducers) { + this.numReducers = numReducers; + } + + public TableDesc getKeySerializeInfo() { + return keySerializeInfo; + } + + public void setKeySerializeInfo(TableDesc keySerializeInfo) { + this.keySerializeInfo = keySerializeInfo; + } + + public TableDesc getValueSerializeInfo() { + return valueSerializeInfo; + } + + public void setValueSerializeInfo(TableDesc valueSerializeInfo) { + this.valueSerializeInfo = valueSerializeInfo; + } + + /** + * Returns the sort order of the key columns. + * + * @return null, which means ascending order for all key columns, or a String + * of the same length as key columns, that consists of only "+" + * (ascending order) and "-" (descending order). + */ + @Explain(displayName = "sort order") + public String getOrder() { + return keySerializeInfo.getProperties().getProperty( + org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER); + } + + public void setOrder(String orderStr) { + keySerializeInfo.getProperties().setProperty( + org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER, + orderStr); + } + + public List> getDistinctColumnIndices() { + return distinctColumnIndices; + } + + public void setDistinctColumnIndices( + List> distinctColumnIndices) { + this.distinctColumnIndices = distinctColumnIndices; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java (revision 0) @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.BaseReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.Serializer; +import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * Reduce Sink Operator sends output to the reduce stage. + **/ +public abstract class BaseReduceSinkOperator extends TerminalOperator + implements Serializable { + + private static final long serialVersionUID = 1L; + protected static final Log LOG = LogFactory.getLog(BaseReduceSinkOperator.class + .getName()); + + /** + * The evaluators for the key columns. Key columns decide the sort order on + * the reducer side. Key columns are passed to the reducer in the "key". + */ + protected transient ExprNodeEvaluator[] keyEval; + /** + * The evaluators for the value columns. Value columns are passed to reducer + * in the "value". + */ + protected transient ExprNodeEvaluator[] valueEval; + /** + * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in + * Hive language). Partition columns decide the reducer that the current row + * goes to. Partition columns are not passed to reducer. + */ + protected transient ExprNodeEvaluator[] partitionEval; + + // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is + // ready + transient Serializer keySerializer; + transient boolean keyIsText; + transient Serializer valueSerializer; + transient int tag; + transient byte[] tagByte = new byte[1]; + transient protected int numDistributionKeys; + transient protected int numDistinctExprs; + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + + try { + keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()]; + int i = 0; + for (ExprNodeDesc e : conf.getKeyCols()) { + keyEval[i++] = ExprNodeEvaluatorFactory.get(e); + } + + numDistributionKeys = conf.getNumDistributionKeys(); + distinctColIndices = conf.getDistinctColumnIndices(); + numDistinctExprs = distinctColIndices.size(); + + valueEval = new ExprNodeEvaluator[conf.getValueCols().size()]; + i = 0; + for (ExprNodeDesc e : conf.getValueCols()) { + valueEval[i++] = ExprNodeEvaluatorFactory.get(e); + } + + partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()]; + i = 0; + for (ExprNodeDesc e : conf.getPartitionCols()) { + partitionEval[i++] = ExprNodeEvaluatorFactory.get(e); + } + + tag = conf.getTag(); + tagByte[0] = (byte) tag; + LOG.info("Using tag = " + tag); + + TableDesc keyTableDesc = conf.getKeySerializeInfo(); + keySerializer = (Serializer) keyTableDesc.getDeserializerClass() + .newInstance(); + keySerializer.initialize(null, keyTableDesc.getProperties()); + keyIsText = keySerializer.getSerializedClass().equals(Text.class); + + TableDesc valueTableDesc = conf.getValueSerializeInfo(); + valueSerializer = (Serializer) valueTableDesc.getDeserializerClass() + .newInstance(); + valueSerializer.initialize(null, valueTableDesc.getProperties()); + + firstRow = true; + initializeChildren(hconf); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + transient InspectableObject tempInspectableObject = new InspectableObject(); + transient HiveKey keyWritable = new HiveKey(); + transient Writable value; + + transient StructObjectInspector keyObjectInspector; + transient StructObjectInspector valueObjectInspector; + transient ObjectInspector[] partitionObjectInspectors; + + transient Object[][] cachedKeys; + transient Object[] cachedValues; + transient List> distinctColIndices; + + boolean firstRow; + + transient Random random; + + /** + * Initializes array of ExprNodeEvaluator. Adds Union field for distinct + * column indices for group by. + * Puts the return values into a StructObjectInspector with output column + * names. + * + * If distinctColIndices is empty, the object inspector is same as + * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)} + */ + protected static StructObjectInspector initEvaluatorsAndReturnStruct( + ExprNodeEvaluator[] evals, List> distinctColIndices, + List outputColNames, + int length, ObjectInspector rowInspector) + throws HiveException { + int inspectorLen = evals.length > length ? length + 1 : evals.length; + List sois = new ArrayList(inspectorLen); + + // keys + ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, 0, length, rowInspector); + sois.addAll(Arrays.asList(fieldObjectInspectors)); + + if (evals.length > length) { + // union keys + List uois = new ArrayList(); + for (List distinctCols : distinctColIndices) { + List names = new ArrayList(); + List eois = new ArrayList(); + int numExprs = 0; + for (int i : distinctCols) { + names.add(HiveConf.getColumnInternalName(numExprs)); + eois.add(evals[i].initialize(rowInspector)); + numExprs++; + } + uois.add(ObjectInspectorFactory.getStandardStructObjectInspector(names, eois)); + } + UnionObjectInspector uoi = + ObjectInspectorFactory.getStandardUnionObjectInspector(uois); + sois.add(uoi); + } + return ObjectInspectorFactory.getStandardStructObjectInspector(outputColNames, sois ); + } + + @Override + public abstract void processOp(Object row, int tag) throws HiveException; + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return "BaseReduceSink"; + } + + @Override + public OperatorType getType() { + return null; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (revision 1237326) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (working copy) @@ -61,6 +61,7 @@ private Reporter rp; private boolean abort = false; private boolean isTagged = false; + private boolean isOperationPathTagged = false; //If operation path is tagged private long cntr = 0; private long nextCntr = 1; @@ -116,6 +117,7 @@ reducer.setParentOperators(null); // clear out any parents as reducer is the // root isTagged = gWork.getNeedsTagging(); + isOperationPathTagged = gWork.getNeedsOperationPathTagging(); try { keyTableDesc = gWork.getKeyDesc(); inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc @@ -164,8 +166,9 @@ private BytesWritable groupKey; - ArrayList row = new ArrayList(3); + ArrayList row = new ArrayList(4); ByteWritable tag = new ByteWritable(); + ByteWritable operationPathTags = new ByteWritable(); public void reduce(Object key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { @@ -188,6 +191,14 @@ keyWritable.setSize(size); } + operationPathTags.set((byte)0); + if (isOperationPathTagged) { + // remove the operation plan tag + int size = keyWritable.getSize() - 1; + operationPathTags.set(keyWritable.get()[size]); + keyWritable.setSize(size); + } + if (!keyWritable.equals(groupKey)) { // If a operator wants to do some work at the beginning of a group if (groupKey == null) { // the first group @@ -234,6 +245,7 @@ row.add(valueObject[tag.get()]); // The tag is not used any more, we should remove it. row.add(tag); + row.add(operationPathTags); if (isLogInfoEnabled) { cntr++; if (cntr == nextCntr) { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java (revision 0) @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.io.LongWritable; + +/** + * Correlation composite operator implementation. + * This operator is used only in map phase. + * Suppose that multiple sub-queries involve a common table, + * to share the table scan, CorrelationCompositeOperator will be used. + * For example, suppose that the common table is T and predicates P1 and P2 will be used + * in sub-queries SQ1 and SQ2, respectively. The CorrelationCompositeOperator + * will apply P1 and P2 on the record and tag the record based on if P1 or P2 is true. + **/ +public class CorrelationCompositeOperator extends Operator implements Serializable { + + static final private Log LOG = LogFactory.getLog(Driver.class.getName()); + static final private LogHelper console = new LogHelper(LOG); + + public static enum Counter { + FORWARDED + } + + /** + * + */ + private static final long serialVersionUID = 1L; + + private ReduceSinkOperator correspondingReduceSinkOperators; + + private transient final LongWritable forwarded_count; + + private transient boolean firstRow; + + public CorrelationCompositeOperator() { + super(); + forwarded_count = new LongWritable(); + } + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return new String("CCO"); + } + + private Object[] rowBuffer; + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + firstRow = true; + rowBuffer = new Object[parentOperators.size()]; + correspondingReduceSinkOperators = conf.getCorrespondingReduceSinkOperator(); + allOperationPathTags = conf.getAllOperationPathTags(); + statsMap.put(Counter.FORWARDED, forwarded_count); + outputObjInspector = ObjectInspectorUtils.getStandardObjectInspector(outputObjInspector, ObjectInspectorCopyOption.JAVA); + + //initialize its children + initializeChildren(hconf); + } + + private int[] allOperationPathTags; + + @Override + public void processOp(Object row, int tag) throws HiveException { + rowBuffer[tag] = ObjectInspectorUtils.copyToStandardObject(row, inputObjInspectors[tag], ObjectInspectorCopyOption.JAVA); + } + + @Override + public void setRowNumber(long rowNumber) { + this.rowNumber = rowNumber; + if (childOperators == null) { + return; + } + for (int i = 0; i < childOperatorsArray.length; i++) { + assert rowNumber >= childOperatorsArray[i].getRowNumber(); + if (rowNumber != childOperatorsArray[i].getRowNumber()) { + childOperatorsArray[i].setRowNumber(rowNumber); + } + } + if (firstRow) { + for (int i = 0; i < rowBuffer.length; i++) { + rowBuffer[i] = null; + } + firstRow = false; + } else { + ArrayList operationPathTags = new ArrayList(); + boolean isForward = false; + Object forwardedRow = null; + for (int i = 0; i < rowBuffer.length; i++) { + if (rowBuffer[i] != null){ + isForward = true; + operationPathTags.add(allOperationPathTags[i]); + if (forwardedRow == null) { + forwardedRow = rowBuffer[i]; + } + } + } + if (isForward) { + assert correspondingReduceSinkOperators != null; + correspondingReduceSinkOperators.setOperationPathTags(operationPathTags); + forwarded_count.set(forwarded_count.get() + 1); + try { + forward(forwardedRow, null); + } catch (HiveException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + for (int i = 0; i < rowBuffer.length; i++) { + rowBuffer[i] = null; + } + forwardedRow = null; + } + } + + @Override + public void closeOp(boolean abort) throws HiveException { + if(!abort){ + ArrayList operationPathTags = new ArrayList(); + boolean isForward = false; + Object forwardedRow = null; + for (int i = 0; i < rowBuffer.length; i++) { + if (rowBuffer[i] != null){ + isForward = true; + operationPathTags.add(allOperationPathTags[i]); + if (forwardedRow == null) { + forwardedRow = rowBuffer[i]; + } + } + } + if (isForward) { + assert correspondingReduceSinkOperators != null; + correspondingReduceSinkOperators.setOperationPathTags(operationPathTags); + forwarded_count.set(forwarded_count.get() + 1); + try { + forward(forwardedRow, null); + } catch (HiveException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + for (int i = 0; i < rowBuffer.length; i++) { + rowBuffer[i] = null; + } + } + } + + @Override + public OperatorType getType() { + // TODO Auto-generated method stub + return null; + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (revision 1237326) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (working copy) @@ -49,6 +49,16 @@ */ private boolean gatherStats; + private boolean forwardRowNumber = false; + + public boolean isForwardRowNumber() { + return forwardRowNumber; + } + + public void setForwardRowNumber(boolean forwardRowNumber) { + this.forwardRowNumber = forwardRowNumber; + } + private ExprNodeDesc filterExpr; public static final String FILTER_EXPR_CONF_STR =