diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index f86d6a7..cde84f7 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -507,6 +507,7 @@ public class HiveConf extends Configuration {
HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join
HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join
HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true),
+ HIVEOPTCORRELATION("hive.optimize.correlation", false), // exploit intra-query correlations
// optimize skewed join by changing the query plan at compile time
HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME("hive.optimize.skewjoin.compiletime", false),
diff --git conf/hive-default.xml.template conf/hive-default.xml.template
index 4a59fb6..6de65dc 100644
--- conf/hive-default.xml.template
+++ conf/hive-default.xml.template
@@ -928,6 +928,12 @@
+ hive.optimize.correlation
+ false
+ exploit intra-query correlations.
+
+
+
hive.exec.dynamic.partition
true
Whether or not to allow dynamic partitions in DML/DDL.
diff --git ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
index 8c9bd26..6a33feb 100644
--- ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
+++ ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
@@ -28,7 +28,10 @@ public enum OperatorType implements org.apache.thrift.TEnum {
LATERALVIEWJOIN(14),
LATERALVIEWFORWARD(15),
HASHTABLESINK(16),
- HASHTABLEDUMMY(17);
+ HASHTABLEDUMMY(17),
+ CORRELATIONCOMPOSITE(18),
+ CORRELATIONLOCALSIMULATIVEREDUCESINK(19),
+ CORRELATIONREDUCERDISPATCH(20);
private final int value;
@@ -85,6 +88,12 @@ public enum OperatorType implements org.apache.thrift.TEnum {
return HASHTABLESINK;
case 17:
return HASHTABLEDUMMY;
+ case 18:
+ return CORRELATIONCOMPOSITE;
+ case 19:
+ return CORRELATIONLOCALSIMULATIVEREDUCESINK;
+ case 20:
+ return CORRELATIONREDUCERDISPATCH;
default:
return null;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java
new file mode 100644
index 0000000..0443545
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * BaseReduceSinkOperator
+ **/
+public abstract class BaseReduceSinkOperator extends
+ TerminalOperator implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ protected static final Log LOG = LogFactory.getLog(BaseReduceSinkOperator.class
+ .getName());
+
+ /**
+ * The evaluators for the key columns. Key columns decide the sort order on
+ * the reducer side. Key columns are passed to the reducer in the "key".
+ */
+ protected transient ExprNodeEvaluator[] keyEval;
+ /**
+ * The evaluators for the value columns. Value columns are passed to reducer
+ * in the "value".
+ */
+ protected transient ExprNodeEvaluator[] valueEval;
+ /**
+ * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
+ * Hive language). Partition columns decide the reducer that the current row
+ * goes to. Partition columns are not passed to reducer.
+ */
+ protected transient ExprNodeEvaluator[] partitionEval;
+
+ // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is
+ // ready
+ protected transient Serializer keySerializer;
+ protected transient boolean keyIsText;
+ protected transient Serializer valueSerializer;
+ protected transient int tag;
+ protected transient byte[] tagByte = new byte[1];
+ protected transient int numDistributionKeys;
+ protected transient int numDistinctExprs;
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+
+ try {
+ keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()];
+ int i = 0;
+ for (ExprNodeDesc e : conf.getKeyCols()) {
+ keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+
+ numDistributionKeys = conf.getNumDistributionKeys();
+ distinctColIndices = conf.getDistinctColumnIndices();
+ numDistinctExprs = distinctColIndices.size();
+
+ valueEval = new ExprNodeEvaluator[conf.getValueCols().size()];
+ i = 0;
+ for (ExprNodeDesc e : conf.getValueCols()) {
+ valueEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+
+ partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()];
+ i = 0;
+ for (ExprNodeDesc e : conf.getPartitionCols()) {
+ partitionEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+
+ tag = conf.getTag();
+ tagByte[0] = (byte) tag;
+ LOG.info("Using tag = " + tag);
+
+ TableDesc keyTableDesc = conf.getKeySerializeInfo();
+ keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
+ .newInstance();
+ keySerializer.initialize(null, keyTableDesc.getProperties());
+ keyIsText = keySerializer.getSerializedClass().equals(Text.class);
+
+ TableDesc valueTableDesc = conf.getValueSerializeInfo();
+ valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+ .newInstance();
+ valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+ isFirstRow = true;
+ initializeChildren(hconf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected transient InspectableObject tempInspectableObject = new InspectableObject();
+ protected transient HiveKey keyWritable = new HiveKey();
+ protected transient Writable value;
+
+ protected transient StructObjectInspector keyObjectInspector;
+ protected transient StructObjectInspector valueObjectInspector;
+ protected transient ObjectInspector[] partitionObjectInspectors;
+
+ protected transient Object[][] cachedKeys;
+ protected transient Object[] cachedValues;
+ protected transient List> distinctColIndices;
+
+ protected boolean isFirstRow;
+
+ protected transient Random random;
+
+ /**
+ * Initializes array of ExprNodeEvaluator. Adds Union field for distinct
+ * column indices for group by.
+ * Puts the return values into a StructObjectInspector with output column
+ * names.
+ *
+ * If distinctColIndices is empty, the object inspector is same as
+ * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)}
+ */
+ protected static StructObjectInspector initEvaluatorsAndReturnStruct(
+ ExprNodeEvaluator[] evals, List> distinctColIndices,
+ List outputColNames,
+ int length, ObjectInspector rowInspector)
+ throws HiveException {
+ int inspectorLen = evals.length > length ? length + 1 : evals.length;
+ List sois = new ArrayList(inspectorLen);
+
+ // keys
+ ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, 0, length, rowInspector);
+ sois.addAll(Arrays.asList(fieldObjectInspectors));
+
+ if (evals.length > length) {
+ // union keys
+ List uois = new ArrayList();
+ for (List distinctCols : distinctColIndices) {
+ List names = new ArrayList();
+ List eois = new ArrayList();
+ int numExprs = 0;
+ for (int i : distinctCols) {
+ names.add(HiveConf.getColumnInternalName(numExprs));
+ eois.add(evals[i].initialize(rowInspector));
+ numExprs++;
+ }
+ uois.add(ObjectInspectorFactory.getStandardStructObjectInspector(names, eois));
+ }
+ UnionObjectInspector uoi =
+ ObjectInspectorFactory.getStandardUnionObjectInspector(uois);
+ sois.add(uoi);
+ }
+ return ObjectInspectorFactory.getStandardStructObjectInspector(outputColNames, sois);
+ }
+
+ @Override
+ public abstract void processOp(Object row, int tag) throws HiveException;
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return "BaseReduceSink";
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java
new file mode 100644
index 0000000..dfe3119
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Correlation composite operator implementation. This operator is used only in map phase for
+ * sharing table scan. Suppose that there are multiple operation paths (e.g. two different
+ * predicates on a table ) that share a common table. A row will be processed by these operation
+ * paths. To tag which operation paths actually forward this row, CorrelationCompositeOperator is
+ * used. For a row, this operator will buffer forwarded rows from its parents and then tag this row
+ * with a operation path tag indicating which paths forwarded this row. Right now, since operation
+ * path tag used in ReduceSinkOperator has 1 byte, this operator can have at most 8 parents
+ * (operation paths). For example, suppose that the common table is T and predicates P1 and P2 will
+ * be used in sub-queries SQ1 and SQ2, respectively. The CorrelationCompositeOperator
+ * will apply P1 and P2 on the row and tag the record based on if P1 or P2 is true.
+ **/
+public class CorrelationCompositeOperator extends Operator implements
+ Serializable {
+
+ public static enum Counter {
+ FORWARDED
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ private ReduceSinkOperator correspondingReduceSinkOperators;
+
+ private transient final LongWritable forwarded_count;
+
+ private transient boolean isFirstRow;
+
+ private int[] allOperationPathTags;
+
+ private Object[] rowBuffer; // buffer the output from multiple parents
+
+ public CorrelationCompositeOperator() {
+ super();
+ forwarded_count = new LongWritable();
+ }
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ isFirstRow = true;
+ rowBuffer = new Object[parentOperators.size()];
+ correspondingReduceSinkOperators = conf.getCorrespondingReduceSinkOperator();
+ allOperationPathTags = conf.getAllOperationPathTags();
+ statsMap.put(Counter.FORWARDED, forwarded_count);
+ outputObjInspector =
+ ObjectInspectorUtils.getStandardObjectInspector(outputObjInspector,
+ ObjectInspectorCopyOption.JAVA);
+
+ // initialize its children
+ initializeChildren(hconf);
+ }
+
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
+ rowBuffer[tag] =
+ ObjectInspectorUtils.copyToStandardObject(row, inputObjInspectors[tag],
+ ObjectInspectorCopyOption.JAVA);
+ }
+
+ private void evaluateBuffer() throws HiveException {
+ List operationPathTags = new ArrayList();
+ boolean isForward = false;
+ Object forwardedRow = null;
+ for (int i = 0; i < rowBuffer.length; i++) {
+ if (rowBuffer[i] != null) {
+ isForward = true;
+ operationPathTags.add(allOperationPathTags[i]);
+ if (forwardedRow == null) {
+ forwardedRow = rowBuffer[i];
+ }
+ }
+ }
+ if (isForward) {
+ assert correspondingReduceSinkOperators != null;
+ correspondingReduceSinkOperators.setOperationPathTags(operationPathTags);
+ forwarded_count.set(forwarded_count.get() + 1);
+ forward(forwardedRow, null);
+ }
+ for (int i = 0; i < rowBuffer.length; i++) {
+ rowBuffer[i] = null;
+ }
+ }
+
+ @Override
+ public void setRowNumber(long rowNumber) throws HiveException {
+ this.rowNumber = rowNumber;
+ if (childOperators == null) {
+ return;
+ }
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ assert rowNumber >= childOperatorsArray[i].getRowNumber();
+ if (rowNumber != childOperatorsArray[i].getRowNumber()) {
+ childOperatorsArray[i].setRowNumber(rowNumber);
+ }
+ }
+ if (isFirstRow) {
+ for (int i = 0; i < rowBuffer.length; i++) {
+ rowBuffer[i] = null;
+ }
+ isFirstRow = false;
+ } else {
+ evaluateBuffer();
+ }
+ }
+
+ @Override
+ public void closeOp(boolean abort) throws HiveException {
+ if (!abort) {
+ evaluateBuffer();
+ }
+ }
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return getOperatorName();
+ }
+
+ static public String getOperatorName() {
+ return "CCO";
+ }
+
+ @Override
+ public OperatorType getType() {
+ return OperatorType.CORRELATIONCOMPOSITE;
+ }
+
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java
new file mode 100644
index 0000000..da86cd9
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.CorrelationLocalSimulativeReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.eclipse.jdt.core.dom.ThisExpression;
+
+/**
+ * CorrelationLocalSimulativeReduceSinkOperator simulates a ReduceSinkOperator and sends output to
+ * another operator (JOIN or GBY). CorrelationLocalSimulativeReduceSinkOperator is used only in
+ * reduce phase. Basically, it is a bridge from one JOIN or GBY operator to another JOIN or GBY
+ * operator. A CorrelationLocalSimulativeReduceSinkOperator will take care actions of startGroup and
+ * endGroup of its succeeding JOIN or GBY operator.
+ * Example: A query involves a JOIN operator and a GBY operator and the GBY operator consume the
+ * output of the JOIN operator. In this case, if join keys and group by keys are the same, we do not
+ * need to shuffle the data again, since data has been already partitioned by the JOIN operator.
+ * Thus, in CorrelationOptimizer, the ReduceSinkOperator between JOIN and GBY operator will be
+ * replaced by a CorrelationLocalSimulativeReduceSinkOperator and the JOIN operator and GBY operator
+ * will be executed in a single reduce phase.
+ **/
+public class CorrelationLocalSimulativeReduceSinkOperator
+ extends BaseReduceSinkOperator {
+
+ private static final long serialVersionUID = 1L;
+ protected static final Log LOG = LogFactory.getLog(
+ CorrelationLocalSimulativeReduceSinkOperator.class.getName());
+
+ private transient TableDesc keyTableDesc;
+
+ private transient Deserializer inputKeyDeserializer;
+
+ private transient SerDe inputValueDeserializer;
+
+ private transient ByteWritable tagWritable;
+
+ private transient ObjectInspector outputKeyObjectInspector;
+ private transient ObjectInspector outputValueObjectInspector;
+
+ private List forwardedRow;
+ private Object keyObject;
+ private Object valueObject;
+
+ private BytesWritable groupKey;
+
+ private static String[] fieldNames;
+
+ static {
+ List fieldNameArray = new ArrayList();
+ for (Utilities.ReduceField r : Utilities.ReduceField.values()) {
+ fieldNameArray.add(r.toString());
+ }
+ fieldNames = fieldNameArray.toArray(new String[0]);
+ }
+
+ public CorrelationLocalSimulativeReduceSinkOperator() {
+ }
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ forwardedRow = new ArrayList(3);
+ tagByte = new byte[1];
+ tagWritable = new ByteWritable();
+ tempInspectableObject = new InspectableObject();
+ keyWritable = new HiveKey();
+ assert childOperatorsArray.length == 1;
+ try {
+ keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()];
+ int i = 0;
+ for (ExprNodeDesc e : conf.getKeyCols()) {
+ keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+
+ numDistributionKeys = conf.getNumDistributionKeys();
+ distinctColIndices = conf.getDistinctColumnIndices();
+ numDistinctExprs = distinctColIndices.size();
+
+ valueEval = new ExprNodeEvaluator[conf.getValueCols().size()];
+ i = 0;
+ for (ExprNodeDesc e : conf.getValueCols()) {
+ valueEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+
+ tag = conf.getTag();
+ tagByte[0] = (byte) tag;
+ tagWritable.set(tagByte[0]);
+ LOG.info("Using tag = " + tag);
+
+ TableDesc keyTableDesc = conf.getKeySerializeInfo();
+ keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
+ .newInstance();
+ keySerializer.initialize(null, keyTableDesc.getProperties());
+ keyIsText = keySerializer.getSerializedClass().equals(Text.class);
+
+ inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc
+ .getDeserializerClass(), null);
+ inputKeyDeserializer.initialize(null, keyTableDesc.getProperties());
+ outputKeyObjectInspector = inputKeyDeserializer.getObjectInspector();
+
+ TableDesc valueTableDesc = conf.getValueSerializeInfo();
+ valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+ .newInstance();
+ valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+ inputValueDeserializer = (SerDe) ReflectionUtils.newInstance(
+ valueTableDesc.getDeserializerClass(), null);
+ inputValueDeserializer.initialize(null, valueTableDesc
+ .getProperties());
+ outputValueObjectInspector = inputValueDeserializer.getObjectInspector();
+
+ ObjectInspector rowInspector = inputObjInspectors[0];
+
+ keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval,
+ distinctColIndices,
+ conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector);
+ valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, conf
+ .getOutputValueColumnNames(), rowInspector);
+ int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1;
+ int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 :
+ numDistributionKeys;
+ cachedKeys = new Object[numKeys][keyLen];
+ cachedValues = new Object[valueEval.length];
+ assert cachedKeys.length == 1;
+
+ List ois = new ArrayList();
+ ois.add(outputKeyObjectInspector);
+ ois.add(outputValueObjectInspector);
+ ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector);
+
+ outputObjInspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois);
+
+ LOG.info("Simulative ReduceSink inputObjInspectors"
+ + ((StructObjectInspector) inputObjInspectors[0]).getTypeName());
+
+ LOG.info("Simulative ReduceSink outputObjInspectors "
+ + this.getChildOperators().get(0).getParentOperators().indexOf(this) +
+ " " + ((StructObjectInspector) outputObjInspector).getTypeName());
+
+ initializeChildren(hconf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
+ try {
+ // Evaluate the value
+ for (int i = 0; i < valueEval.length; i++) {
+ cachedValues[i] = valueEval[i].evaluate(row);
+ }
+ // Serialize the value
+ value = valueSerializer.serialize(cachedValues, valueObjectInspector);
+ valueObject = inputValueDeserializer.deserialize(value);
+
+ // Evaluate the keys
+ Object[] distributionKeys = new Object[numDistributionKeys];
+ for (int i = 0; i < numDistributionKeys; i++) {
+ distributionKeys[i] = keyEval[i].evaluate(row);
+ }
+
+ if (numDistinctExprs > 0) {
+ // with distinct key(s)
+ for (int i = 0; i < numDistinctExprs; i++) {
+ System.arraycopy(distributionKeys, 0, cachedKeys[i], 0, numDistributionKeys);
+ Object[] distinctParameters =
+ new Object[distinctColIndices.get(i).size()];
+ for (int j = 0; j < distinctParameters.length; j++) {
+ distinctParameters[j] =
+ keyEval[distinctColIndices.get(i).get(j)].evaluate(row);
+ }
+ cachedKeys[i][numDistributionKeys] =
+ new StandardUnion((byte) i, distinctParameters);
+ }
+ } else {
+ // no distinct key
+ System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys);
+ }
+
+ for (int i = 0; i < cachedKeys.length; i++) {
+ if (keyIsText) {
+ Text key = (Text) keySerializer.serialize(cachedKeys[i],
+ keyObjectInspector);
+ keyWritable.set(key.getBytes(), 0, key.getLength());
+ } else {
+ // Must be BytesWritable
+ BytesWritable key = (BytesWritable) keySerializer.serialize(
+ cachedKeys[i], keyObjectInspector);
+ keyWritable.set(key.getBytes(), 0, key.getLength());
+ }
+
+ if (!keyWritable.equals(groupKey)) {
+ try {
+ keyObject = inputKeyDeserializer.deserialize(keyWritable);
+ } catch (Exception e) {
+ throw new HiveException(
+ "Hive Runtime Error: Unable to deserialize reduce input key from "
+ + Utilities.formatBinaryString(keyWritable.get(), 0,
+ keyWritable.getSize()) + " with properties "
+ + keyTableDesc.getProperties(), e);
+ }
+ if (groupKey == null) { // the first group
+ groupKey = new BytesWritable();
+ } else {
+ // if its child has not been ended, end it
+ if (!keyWritable.equals(childOperatorsArray[0].getBytesWritableGroupKey())) {
+ childOperatorsArray[0].endGroup();
+ }
+ }
+ groupKey.set(keyWritable.get(), 0, keyWritable.getSize());
+ if (!groupKey.equals(childOperatorsArray[0].getBytesWritableGroupKey())) {
+ childOperatorsArray[0].startGroup();
+ childOperatorsArray[0].setGroupKeyObject(keyObject);
+ childOperatorsArray[0].setBytesWritableGroupKey(groupKey);
+ }
+ }
+ forwardedRow.clear();
+ forwardedRow.add(keyObject);
+ forwardedRow.add(valueObject);
+ forwardedRow.add(tagWritable);
+ forward(forwardedRow, outputObjInspector);
+ }
+ } catch (SerDeException e) {
+ throw new HiveException(e);
+ }
+ }
+
+ @Override
+ public void closeOp(boolean abort) throws HiveException {
+ if (!abort) {
+ Operator extends OperatorDesc> child = childOperatorsArray[0];
+ if (child.allInitializedParentsAreClosed()) {
+ LOG.info("All parents of " + child.getName() + " (id: " + child.getIdentifier() +
+ ") has been closed. Invoke its endGroup");
+ childOperatorsArray[0].endGroup();
+ }
+ }
+ }
+
+ @Override
+ public void startGroup() throws HiveException {
+ // do nothing
+ }
+
+ @Override
+ public void endGroup() throws HiveException {
+ // do nothing
+ }
+
+ @Override
+ public void setGroupKeyObject(Object keyObject) {
+ // do nothing
+ }
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return getOperatorName();
+ }
+
+ static public String getOperatorName() {
+ return "CLSReduceSink";
+ }
+
+ @Override
+ public OperatorType getType() {
+ return OperatorType.CORRELATIONLOCALSIMULATIVEREDUCESINK;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java
new file mode 100644
index 0000000..83db208
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java
@@ -0,0 +1,454 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+
+/**
+ * CorrelationReducerDispatchOperator is an operator used by MapReduce join optimized by
+ * CorrelationOptimizer. If used, CorrelationReducerDispatchOperator is the first operator in reduce
+ * phase. In the case that multiple operation paths are merged into a single one, it will dispatch
+ * the record to corresponding JOIN or GBY operators. Every child of this operator is associated
+ * with a DispatcherHnadler, which evaluates the input row of this operator and then select
+ * corresponding fields for its associated child.
+ */
+public class CorrelationReducerDispatchOperator extends Operator
+ implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private static String[] fieldNames;
+ static {
+ List fieldNameArray = new ArrayList();
+ for (Utilities.ReduceField r : Utilities.ReduceField.values()) {
+ fieldNameArray.add(r.toString());
+ }
+ fieldNames = fieldNameArray.toArray(new String[0]);
+ }
+
+ protected static class DispatchHandler {
+
+ protected Log l4j = LogFactory.getLog(this.getClass().getName());
+
+ private final ObjectInspector[] inputObjInspector;
+ private ObjectInspector outputObjInspector;
+ private ObjectInspector keyObjInspector;
+ private ObjectInspector valueObjInspector;
+ private final byte inputTag;
+ private final byte outputTag;
+ private final byte childIndx;
+ private final ByteWritable outputTagByteWritable;
+ private final SelectDesc keySelectDesc;
+ private final SelectDesc valueSelectDesc;
+ private ExprNodeEvaluator[] keyEval;
+ private ExprNodeEvaluator[] eval;
+
+ // counters for debugging
+ private transient long cntr = 0;
+ private transient long nextCntr = 1;
+
+ private long getNextCntr(long cntr) {
+ // A very simple counter to keep track of number of rows processed by an
+ // operator. It dumps
+ // every 1 million times, and quickly before that
+ if (cntr >= 1000000) {
+ return cntr + 1000000;
+ }
+ return 10 * cntr;
+ }
+
+ public long getCntr() {
+ return this.cntr;
+ }
+
+ private final Log LOG;
+ private final boolean isLogInfoEnabled;
+ private final String id;
+
+ public DispatchHandler(ObjectInspector[] inputObjInspector, byte inputTag, byte childIndx,
+ byte outputTag,
+ SelectDesc valueSelectDesc, SelectDesc keySelectDesc, Log LOG, String id)
+ throws HiveException {
+ this.inputObjInspector = inputObjInspector;
+ assert this.inputObjInspector.length == 1;
+ this.inputTag = inputTag;
+ this.childIndx = childIndx;
+ this.outputTag = outputTag;
+ this.valueSelectDesc = valueSelectDesc;
+ this.keySelectDesc = keySelectDesc;
+ this.outputTagByteWritable = new ByteWritable(outputTag);
+ this.LOG = LOG;
+ this.isLogInfoEnabled = LOG.isInfoEnabled();
+ this.id = id;
+ init();
+ }
+
+ private void init() throws HiveException {
+ List ois = new ArrayList();
+ if (keySelectDesc.isSelStarNoCompute()) {
+ ois.add((ObjectInspector) ((List) inputObjInspector[0]).get(0));
+ } else {
+ List colList = this.keySelectDesc.getColList();
+ keyEval = new ExprNodeEvaluator[colList.size()];
+ for (int k = 0; k < colList.size(); k++) {
+ assert (colList.get(k) != null);
+ keyEval[k] = ExprNodeEvaluatorFactory.get(colList.get(k));
+ }
+ keyObjInspector =
+ initEvaluatorsAndReturnStruct(keyEval, keySelectDesc
+ .getOutputColumnNames(), ((StandardStructObjectInspector) inputObjInspector[0])
+ .getAllStructFieldRefs().get(0).getFieldObjectInspector());
+
+ ois.add(keyObjInspector);
+ l4j.info("Key: input tag " + (int) inputTag + ", output tag " + (int) outputTag
+ + ", SELECT inputOIForThisTag"
+ + ((StructObjectInspector) inputObjInspector[0]).getTypeName());
+ }
+ if (valueSelectDesc.isSelStarNoCompute()) {
+ ois.add((ObjectInspector) ((List) inputObjInspector[0]).get(1));
+ } else {
+ List colList = this.valueSelectDesc.getColList();
+ eval = new ExprNodeEvaluator[colList.size()];
+ for (int k = 0; k < colList.size(); k++) {
+ assert (colList.get(k) != null);
+ eval[k] = ExprNodeEvaluatorFactory.get(colList.get(k));
+ }
+ valueObjInspector =
+ initEvaluatorsAndReturnStruct(eval, valueSelectDesc
+ .getOutputColumnNames(), ((StandardStructObjectInspector) inputObjInspector[0])
+ .getAllStructFieldRefs().get(1).getFieldObjectInspector());
+
+ ois.add(valueObjInspector);
+ l4j.info("input tag " + (int) inputTag + ", output tag " + (int) outputTag
+ + ", SELECT inputOIForThisTag"
+ + ((StructObjectInspector) inputObjInspector[0]).getTypeName());
+ }
+ ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector);
+ outputObjInspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois);
+ l4j.info("input tag " + (int) inputTag + ", output tag " + (int) outputTag
+ + ", SELECT outputObjInspector"
+ + ((StructObjectInspector) outputObjInspector).getTypeName());
+ }
+
+ public ObjectInspector getOutputObjInspector() {
+ return outputObjInspector;
+ }
+
+ public Object process(Object row) throws HiveException {
+ List keyOutput = new ArrayList(keyEval.length);
+ Object[] valueOutput = new Object[eval.length];
+ List outputRow = new ArrayList(3);
+ List thisRow = (List) row;
+ if (keySelectDesc.isSelStarNoCompute()) {
+ outputRow.add(thisRow.get(0));
+ } else {
+ Object key = thisRow.get(0);
+ for (int j = 0; j < keyEval.length; j++) {
+ try {
+ keyOutput.add(keyEval[j].evaluate(key));
+ } catch (HiveException e) {
+ throw e;
+ } catch (RuntimeException e) {
+ throw new HiveException("Error evaluating "
+ + keySelectDesc.getColList().get(j).getExprString(), e);
+ }
+ }
+ outputRow.add(keyOutput);
+ }
+
+ if (valueSelectDesc.isSelStarNoCompute()) {
+ outputRow.add(thisRow.get(1));
+ } else {
+ Object value = thisRow.get(1);
+ for (int j = 0; j < eval.length; j++) {
+ try {
+ valueOutput[j] = eval[j].evaluate(value);
+ } catch (HiveException e) {
+ throw e;
+ } catch (RuntimeException e) {
+ throw new HiveException("Error evaluating "
+ + valueSelectDesc.getColList().get(j).getExprString(), e);
+ }
+ }
+ outputRow.add(valueOutput);
+ }
+ outputRow.add(outputTagByteWritable);
+
+ if (isLogInfoEnabled) {
+ cntr++;
+ if (cntr == nextCntr) {
+ LOG.info(id + "(inputTag, childIndx, outputTag)=(" + inputTag + ", " + childIndx + ", "
+ + outputTag + "), forwarding " + cntr + " rows");
+ nextCntr = getNextCntr(cntr);
+ }
+ }
+
+ return outputRow;
+ }
+
+ public void printCloseOpLog() {
+ LOG.info(id + "(inputTag, childIndx, outputTag)=(" + inputTag + ", " + childIndx + ", "
+ + outputTag + "), forwarded " + cntr + " rows");
+ }
+ }
+
+ // inputTag->(Child->List)
+ private Map>> dispatchConf;
+ // inputTag->(Child->List)
+ private Map>> dispatchValueSelectDescConf;
+ // inputTag->(Child->List)
+ private Map>> dispatchKeySelectDescConf;
+ // inputTag->(Child->List)
+ private Map>> dispatchHandlers;
+ // Child->(outputTag->DispatchHandler)
+ private Map> child2OutputTag2DispatchHandlers;
+ // Child->Child's inputObjInspectors
+ private Map childInputObjInspectors;
+
+ private int operationPathTag;
+ private int inputTag;
+
+ private Object[] lastDispatchedRows;
+ private int[] lastDispatchedTags;
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ dispatchConf = conf.getDispatchConf();
+ dispatchValueSelectDescConf = conf.getDispatchValueSelectDescConf();
+ dispatchKeySelectDescConf = conf.getDispatchKeySelectDescConf();
+ dispatchHandlers = new HashMap>>();
+ for (Entry>> entry : dispatchConf.entrySet()) {
+ Map> tmp =
+ new HashMap>();
+ for (Entry> child2outputTag : entry.getValue().entrySet()) {
+ tmp.put(child2outputTag.getKey(), new ArrayList());
+ int indx = 0;
+ for (Integer outputTag : child2outputTag.getValue()) {
+ ObjectInspector[] thisInputObjectInspector =
+ new ObjectInspector[] {inputObjInspectors[entry.getKey()]};
+ Integer thisInputTag = entry.getKey();
+ Integer thisChildIndx = child2outputTag.getKey();
+ SelectDesc thisValueSelectDesc = dispatchValueSelectDescConf.get(thisInputTag)
+ .get(thisChildIndx).get(indx);
+ SelectDesc thisKeySelectDesc = dispatchKeySelectDescConf.get(thisInputTag)
+ .get(thisChildIndx).get(indx);
+ tmp.get(child2outputTag.getKey()).add(
+ new DispatchHandler(thisInputObjectInspector,
+ thisInputTag.byteValue(), thisChildIndx.byteValue(), outputTag.byteValue(),
+ thisValueSelectDesc, thisKeySelectDesc, LOG, id));
+ indx++;
+ }
+ }
+ dispatchHandlers.put(entry.getKey(), tmp);
+ }
+
+ child2OutputTag2DispatchHandlers = new HashMap>();
+ for (Entry>> entry : dispatchConf.entrySet()) {
+ for (Entry> child2outputTag : entry.getValue().entrySet()) {
+ if (!child2OutputTag2DispatchHandlers.containsKey(child2outputTag.getKey())) {
+ child2OutputTag2DispatchHandlers.put(child2outputTag.getKey(),
+ new HashMap());
+ }
+ int indx = 0;
+ for (Integer outputTag : child2outputTag.getValue()) {
+ child2OutputTag2DispatchHandlers.get(child2outputTag.getKey()).
+ put(outputTag,
+ dispatchHandlers.get(entry.getKey()).get(child2outputTag.getKey()).get(indx));
+ indx++;
+ }
+ }
+ }
+
+ childInputObjInspectors = new HashMap();
+ for (Entry> entry : child2OutputTag2DispatchHandlers
+ .entrySet()) {
+ Integer l = Collections.max(entry.getValue().keySet());
+ ObjectInspector[] childObjInspectors = new ObjectInspector[l.intValue() + 1];
+ for (Entry e : entry.getValue().entrySet()) {
+ if (e.getKey().intValue() == -1) {
+ assert childObjInspectors.length == 1;
+ childObjInspectors[0] = e.getValue().getOutputObjInspector();
+ } else {
+ childObjInspectors[e.getKey().intValue()] = e.getValue().getOutputObjInspector();
+ }
+ }
+ childInputObjInspectors.put(entry.getKey(), childObjInspectors);
+ }
+
+ lastDispatchedRows = new Object[childOperatorsArray.length];
+ lastDispatchedTags = new int[childOperatorsArray.length];
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ lastDispatchedRows[i] = null;
+ lastDispatchedTags[i] = -1;
+ }
+
+ initializeChildren(hconf);
+ }
+
+ // Each child should has its own outputObjInspector
+ @Override
+ protected void initializeChildren(Configuration hconf) throws HiveException {
+ state = State.INIT;
+ LOG.info("Operator " + id + " " + getName() + " initialized");
+ if (childOperators == null) {
+ return;
+ }
+ LOG.info("Initializing children of " + id + " " + getName());
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ LOG.info("Initializing child " + i + " " + childOperatorsArray[i].getIdentifier() + " " +
+ childOperatorsArray[i].getName() +
+ " " + childInputObjInspectors.get(i).length);
+ childOperatorsArray[i].initialize(hconf, childInputObjInspectors.get(i));
+ if (reporter != null) {
+ childOperatorsArray[i].setReporter(reporter);
+ }
+ }
+ }
+
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
+ List thisRow = (List) row;
+ assert thisRow.size() == 4;
+ operationPathTag = ((ByteWritable) thisRow.get(3)).get();
+ inputTag = ((ByteWritable) thisRow.get(2)).get();
+ forward(thisRow.subList(0, 3), inputObjInspectors[inputTag]);
+ }
+
+ @Override
+ public void forward(Object row, ObjectInspector rowInspector)
+ throws HiveException {
+ if ((++outputRows % 1000) == 0) {
+ if (counterNameToEnum != null) {
+ incrCounter(numOutputRowsCntr, outputRows);
+ outputRows = 0;
+ }
+ }
+
+ if (childOperatorsArray == null && childOperators != null) {
+ throw new HiveException("Internal Hive error during operator initialization.");
+ }
+
+ if ((childOperatorsArray == null) || (getDone())) {
+ return;
+ }
+
+ int childrenDone = 0;
+ int forwardFlag = 1;
+ assert childOperatorsArray.length <= 8;
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ Operator extends OperatorDesc> o = childOperatorsArray[i];
+ if (o.getDone()) {
+ childrenDone++;
+ } else {
+ int isProcess = (operationPathTag & (forwardFlag << i));
+ if (isProcess != 0) {
+ if (o.getName().equals(GroupByOperator.getOperatorName())) {
+ GroupByOperator gbyop = (GroupByOperator) o;
+ gbyop.setForcedForward(false);
+ if (!this.bytesWritableGroupKey.equals(o.getBytesWritableGroupKey())) {
+ o.setBytesWritableGroupKey(this.bytesWritableGroupKey);
+ }
+ }
+ for (int j = 0; j < dispatchHandlers.get(inputTag).get(i).size(); j++) {
+ Object dispatchedRow = dispatchHandlers.get(inputTag).get(i).get(j).process(row);
+ int dispatchedTag = dispatchConf.get(inputTag).get(i).get(j);
+ o.process(dispatchedRow, dispatchedTag);
+ lastDispatchedRows[i] = dispatchedRow;
+ lastDispatchedTags[i] = dispatchedTag;
+ }
+ }
+ if (isProcess == 0 && o.getName().equals(GroupByOperator.getOperatorName())) {
+ if (lastDispatchedRows[i] != null &&
+ !this.bytesWritableGroupKey.equals(o.getBytesWritableGroupKey())) {
+ GroupByOperator gbyop = (GroupByOperator) o;
+ gbyop.setForcedForward(true);
+ o.setBytesWritableGroupKey(this.bytesWritableGroupKey);
+ o.process(lastDispatchedRows[i], lastDispatchedTags[i]);
+ }
+ }
+ }
+ }
+
+ // if all children are done, this operator is also done
+ if (childrenDone == childOperatorsArray.length) {
+ setDone(true);
+ }
+ }
+
+ @Override
+ protected void closeOp(boolean abort) throws HiveException {
+ // log the number of rows forwarded from each dispatcherHandler
+ for (Map> childIndx2DispatchHandlers : dispatchHandlers
+ .values()) {
+ for (List dispatchHandlers : childIndx2DispatchHandlers.values()) {
+ for (DispatchHandler dispatchHandler : dispatchHandlers) {
+ dispatchHandler.printCloseOpLog();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void setGroupKeyObject(Object keyObject) {
+ this.groupKeyObject = keyObject;
+ for (Operator extends OperatorDesc> op : childOperators) {
+ op.setGroupKeyObject(keyObject);
+ }
+ }
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return getOperatorName();
+ }
+
+ static public String getOperatorName() {
+ return "CDP";
+ }
+
+ @Override
+ public OperatorType getType() {
+ return OperatorType.CORRELATIONREDUCERDISPATCH;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
index 18a9bd2..b37f554 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
@@ -25,6 +25,7 @@ import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -61,6 +62,7 @@ public class ExecReducer extends MapReduceBase implements Reducer {
private Reporter rp;
private boolean abort = false;
private boolean isTagged = false;
+ private boolean isOperationPathTagged = false;
private long cntr = 0;
private long nextCntr = 1;
@@ -116,6 +118,7 @@ public class ExecReducer extends MapReduceBase implements Reducer {
reducer.setParentOperators(null); // clear out any parents as reducer is the
// root
isTagged = gWork.getNeedsTagging();
+ isOperationPathTagged = gWork.getNeedsOperationPathTagging();
try {
keyTableDesc = gWork.getKeyDesc();
inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc
@@ -164,8 +167,9 @@ public class ExecReducer extends MapReduceBase implements Reducer {
private BytesWritable groupKey;
- ArrayList row = new ArrayList(3);
+ List row = new ArrayList(4);
ByteWritable tag = new ByteWritable();
+ ByteWritable operationPathTags = new ByteWritable();
public void reduce(Object key, Iterator values, OutputCollector output,
Reporter reporter) throws IOException {
@@ -190,6 +194,14 @@ public class ExecReducer extends MapReduceBase implements Reducer {
keyWritable.setSize(size);
}
+ operationPathTags.set((byte)0);
+ if (isOperationPathTagged) {
+ // remove the operation plan tag
+ int size = keyWritable.getSize() - 1;
+ operationPathTags.set(keyWritable.get()[size]);
+ keyWritable.setSize(size);
+ }
+
if (!keyWritable.equals(groupKey)) {
// If a operator wants to do some work at the beginning of a group
if (groupKey == null) { // the first group
@@ -214,6 +226,7 @@ public class ExecReducer extends MapReduceBase implements Reducer {
l4j.trace("Start Group");
reducer.startGroup();
reducer.setGroupKeyObject(keyObject);
+ reducer.setBytesWritableGroupKey(groupKey);
}
// System.err.print(keyObject.toString());
while (values.hasNext()) {
@@ -236,6 +249,7 @@ public class ExecReducer extends MapReduceBase implements Reducer {
row.add(valueObject[tag.get()]);
// The tag is not used any more, we should remove it.
row.add(tag);
+ row.add(operationPathTags);
if (isLogInfoEnabled) {
cntr++;
if (cntr == nextCntr) {
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index 652d81c..1fa7936 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -144,6 +144,13 @@ public class GroupByOperator extends Operator implements
private long maxMemory;
private float memoryThreshold;
+ private boolean forcedForward; // only used by CorrelationReducerDispatchOperator to make
+ // GroupByOperator has the same pace with other
+ // GroupByOperators and JoinOperators.
+ // If true and newKeys is different from currentKeys,
+ // data associated with currentKeys will be
+ // forwarded, otherwise, nothing happens.
+
/**
* This is used to store the position and field names for variable length
* fields.
@@ -385,6 +392,7 @@ public class GroupByOperator extends Operator implements
memoryMXBean = ManagementFactory.getMemoryMXBean();
maxMemory = memoryMXBean.getHeapMemoryUsage().getMax();
memoryThreshold = this.getConf().getMemoryThreshold();
+ forcedForward = false;
initializeChildren(hconf);
}
@@ -793,6 +801,10 @@ public class GroupByOperator extends Operator implements
}
}
+ public void setForcedForward(boolean forcedForward) {
+ this.forcedForward = forcedForward;
+ }
+
// Non-hash aggregation
private void processAggr(Object row, ObjectInspector rowInspector,
KeyWrapper newKeys) throws HiveException {
@@ -806,11 +818,16 @@ public class GroupByOperator extends Operator implements
newKeys.equals(currentKeys) : false;
// Forward the current keys if needed for sort-based aggregation
- if (currentKeys != null && !keysAreEqual) {
+ if (currentKeys != null && (!keysAreEqual || forcedForward)) {
forward(currentKeys.getKeyArray(), aggregations);
countAfterReport = 0;
}
+ if (forcedForward) {
+ currentKeys = null;
+ return;
+ }
+
// Need to update the keys?
if (currentKeys == null || !keysAreEqual) {
if (currentKeys == null) {
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index 587fe33..46aa3bc 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.OutputCollector;
@@ -1420,4 +1421,52 @@ public abstract class Operator implements Serializable,C
public void setUseBucketizedHiveInputFormat(boolean useBucketizedHiveInputFormat) {
this.useBucketizedHiveInputFormat = useBucketizedHiveInputFormat;
}
+
+ // bytesWritableGroupKey is only used when a query plan is optimized by CorrelationOptimizer.
+ // CorrelationLocalSimulativeReduceSinkOperator will use this variable to determine when it needs to start or end the group
+ // for its child operator.
+ protected BytesWritable bytesWritableGroupKey;
+
+ public void setBytesWritableGroupKey(BytesWritable groupKey) {
+ if (bytesWritableGroupKey == null) {
+ bytesWritableGroupKey = new BytesWritable();
+ }
+ bytesWritableGroupKey.set(groupKey.get(), 0, groupKey.getSize());
+ }
+
+ public BytesWritable getBytesWritableGroupKey() {
+ return bytesWritableGroupKey;
+ }
+
+ // The number of current row
+ protected long rowNumber;
+
+ public void initializeRowNumber() {
+ this.rowNumber = 0L;
+ LOG.info("Operator " + id + " " + getName() + " row number initialized to 0");
+ if (childOperators == null) {
+ return;
+ }
+ LOG.info("Initializing row numbers of children of " + id + " " + getName());
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ childOperatorsArray[i].initializeRowNumber();
+ }
+ }
+
+ public void setRowNumber(long rowNumber) throws HiveException {
+ this.rowNumber = rowNumber;
+ if (childOperators == null) {
+ return;
+ }
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ assert rowNumber >= childOperatorsArray[i].getRowNumber();
+ if (rowNumber != childOperatorsArray[i].getRowNumber()) {
+ childOperatorsArray[i].setRowNumber(rowNumber);
+ }
+ }
+ }
+
+ public long getRowNumber() {
+ return rowNumber;
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
index 0c22141..064afc9 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
@@ -22,6 +22,9 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.ql.plan.CollectDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationLocalSimulativeReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc;
import org.apache.hadoop.hive.ql.plan.ExtractDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc;
@@ -91,6 +94,12 @@ public final class OperatorFactory {
HashTableDummyOperator.class));
opvec.add(new OpTuple(HashTableSinkDesc.class,
HashTableSinkOperator.class));
+ opvec.add(new OpTuple(CorrelationCompositeDesc.class,
+ CorrelationCompositeOperator.class));
+ opvec.add(new OpTuple(CorrelationReducerDispatchDesc.class,
+ CorrelationReducerDispatchOperator.class));
+ opvec.add(new OpTuple(CorrelationLocalSimulativeReduceSinkDesc.class,
+ CorrelationLocalSimulativeReduceSinkOperator.class));
}
public static Operator get(Class opClass) {
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 919a140..899dd9c 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -21,179 +21,50 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Random;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.Serializer;
-import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
/**
* Reduce Sink Operator sends output to the reduce stage.
**/
-public class ReduceSinkOperator extends TerminalOperator
+public class ReduceSinkOperator extends BaseReduceSinkOperator
implements Serializable {
private static final long serialVersionUID = 1L;
- /**
- * The evaluators for the key columns. Key columns decide the sort order on
- * the reducer side. Key columns are passed to the reducer in the "key".
- */
- protected transient ExprNodeEvaluator[] keyEval;
- /**
- * The evaluators for the value columns. Value columns are passed to reducer
- * in the "value".
- */
- protected transient ExprNodeEvaluator[] valueEval;
- /**
- * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
- * Hive language). Partition columns decide the reducer that the current row
- * goes to. Partition columns are not passed to reducer.
- */
- protected transient ExprNodeEvaluator[] partitionEval;
-
- // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is
- // ready
- transient Serializer keySerializer;
- transient boolean keyIsText;
- transient Serializer valueSerializer;
- transient int tag;
- transient byte[] tagByte = new byte[1];
- transient protected int numDistributionKeys;
- transient protected int numDistinctExprs;
-
- @Override
- protected void initializeOp(Configuration hconf) throws HiveException {
-
- try {
- keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()];
- int i = 0;
- for (ExprNodeDesc e : conf.getKeyCols()) {
- keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
- }
-
- numDistributionKeys = conf.getNumDistributionKeys();
- distinctColIndices = conf.getDistinctColumnIndices();
- numDistinctExprs = distinctColIndices.size();
-
- valueEval = new ExprNodeEvaluator[conf.getValueCols().size()];
- i = 0;
- for (ExprNodeDesc e : conf.getValueCols()) {
- valueEval[i++] = ExprNodeEvaluatorFactory.get(e);
- }
-
- partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()];
- i = 0;
- for (ExprNodeDesc e : conf.getPartitionCols()) {
- partitionEval[i++] = ExprNodeEvaluatorFactory.get(e);
- }
-
- tag = conf.getTag();
- tagByte[0] = (byte) tag;
- LOG.info("Using tag = " + tag);
+ private final List operationPathTags = new ArrayList(); // operation path tags
+ private final byte[] operationPathTagsByte = new byte[1];
- TableDesc keyTableDesc = conf.getKeySerializeInfo();
- keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
- .newInstance();
- keySerializer.initialize(null, keyTableDesc.getProperties());
- keyIsText = keySerializer.getSerializedClass().equals(Text.class);
-
- TableDesc valueTableDesc = conf.getValueSerializeInfo();
- valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
- .newInstance();
- valueSerializer.initialize(null, valueTableDesc.getProperties());
-
- firstRow = true;
- initializeChildren(hconf);
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
+ public void setOperationPathTags(List operationPathTags) {
+ this.operationPathTags.addAll(operationPathTags);
+ int operationPathTagsInt = 0;
+ int tmp = 1;
+ for (Integer operationPathTag: operationPathTags) {
+ operationPathTagsInt += tmp << operationPathTag.intValue();
}
+ operationPathTagsByte[0] = (byte) operationPathTagsInt;
}
- transient InspectableObject tempInspectableObject = new InspectableObject();
- transient HiveKey keyWritable = new HiveKey();
- transient Writable value;
-
- transient StructObjectInspector keyObjectInspector;
- transient StructObjectInspector valueObjectInspector;
- transient ObjectInspector[] partitionObjectInspectors;
-
- transient Object[][] cachedKeys;
- transient Object[] cachedValues;
- transient List> distinctColIndices;
-
- boolean firstRow;
-
- transient Random random;
-
- /**
- * Initializes array of ExprNodeEvaluator. Adds Union field for distinct
- * column indices for group by.
- * Puts the return values into a StructObjectInspector with output column
- * names.
- *
- * If distinctColIndices is empty, the object inspector is same as
- * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)}
- */
- protected static StructObjectInspector initEvaluatorsAndReturnStruct(
- ExprNodeEvaluator[] evals, List> distinctColIndices,
- List outputColNames,
- int length, ObjectInspector rowInspector)
- throws HiveException {
- int inspectorLen = evals.length > length ? length + 1 : evals.length;
- List sois = new ArrayList(inspectorLen);
-
- // keys
- ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, 0, length, rowInspector);
- sois.addAll(Arrays.asList(fieldObjectInspectors));
-
- if (evals.length > length) {
- // union keys
- List uois = new ArrayList();
- for (List distinctCols : distinctColIndices) {
- List names = new ArrayList();
- List eois = new ArrayList();
- int numExprs = 0;
- for (int i : distinctCols) {
- names.add(HiveConf.getColumnInternalName(numExprs));
- eois.add(evals[i].initialize(rowInspector));
- numExprs++;
- }
- uois.add(ObjectInspectorFactory.getStandardStructObjectInspector(names, eois));
- }
- UnionObjectInspector uoi =
- ObjectInspectorFactory.getStandardUnionObjectInspector(uois);
- sois.add(uoi);
- }
- return ObjectInspectorFactory.getStandardStructObjectInspector(outputColNames, sois );
+ public List getOperationPathTags() {
+ return this.operationPathTags;
}
@Override
public void processOp(Object row, int tag) throws HiveException {
try {
ObjectInspector rowInspector = inputObjInspectors[tag];
- if (firstRow) {
- firstRow = false;
+ if (isFirstRow) {
+ isFirstRow = false;
keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval,
distinctColIndices,
conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector);
@@ -267,9 +138,18 @@ public class ReduceSinkOperator extends TerminalOperator
keyWritable.set(key.getBytes(), 0, key.getLength());
} else {
int keyLength = key.getLength();
- keyWritable.setSize(keyLength + 1);
+ if (!this.getConf().getNeedsOperationPathTagging()) {
+ keyWritable.setSize(keyLength + 1);
+ } else {
+ keyWritable.setSize(keyLength + 2);
+ }
System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
- keyWritable.get()[keyLength] = tagByte[0];
+ if (!this.getConf().getNeedsOperationPathTagging()) {
+ keyWritable.get()[keyLength] = tagByte[0];
+ } else {
+ keyWritable.get()[keyLength] = operationPathTagsByte[0];
+ keyWritable.get()[keyLength + 1] = tagByte[0];
+ }
}
} else {
// Must be BytesWritable
@@ -279,9 +159,18 @@ public class ReduceSinkOperator extends TerminalOperator
keyWritable.set(key.getBytes(), 0, key.getLength());
} else {
int keyLength = key.getLength();
- keyWritable.setSize(keyLength + 1);
+ if (!this.getConf().getNeedsOperationPathTagging()) {
+ keyWritable.setSize(keyLength + 1);
+ } else {
+ keyWritable.setSize(keyLength + 2);
+ }
System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
- keyWritable.get()[keyLength] = tagByte[0];
+ if (!this.getConf().getNeedsOperationPathTagging()) {
+ keyWritable.get()[keyLength] = tagByte[0];
+ } else {
+ keyWritable.get()[keyLength] = operationPathTagsByte[0];
+ keyWritable.get()[keyLength + 1] = tagByte[0];
+ }
}
}
keyWritable.setHashCode(keyHashCode);
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
index 1469325..6f9b62c 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
@@ -80,6 +80,9 @@ public class TableScanOperator extends Operator implements
if (conf != null && conf.isGatherStats()) {
gatherStats(row);
}
+ if (conf != null && conf.isForwardRowNumber()) {
+ setRowNumber(rowNumber+1);
+ }
forward(row, inputObjInspectors[tag]);
}
@@ -169,6 +172,12 @@ public class TableScanOperator extends Operator implements
if (conf == null) {
return;
}
+
+ LOG.info(this.getName() + " forward row number " + conf.isForwardRowNumber());
+ if(conf.isForwardRowNumber()){
+ initializeRowNumber();
+ }
+
if (!conf.isGatherStats()) {
return;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java
new file mode 100644
index 0000000..06d1486
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java
@@ -0,0 +1,957 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.QB;
+import org.apache.hadoop.hive.ql.parse.QBExpr;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+
+/**
+ * Implementation of correlation optimizer. The optimization is based on
+ * the paper "YSmart: Yet Another SQL-to-MapReduce Translator"
+ * (Rubao Lee, Tian Luo, Yin Huai, Fusheng Wang, Yongqiang He, and Xiaodong Zhang)
+ * (http://www.cse.ohio-state.edu/hpcs/WWW/HTML/publications/papers/TR-11-7.pdf).
+ * This optimizer first detects three kinds of
+ * correlations, Input Correlation (IC), Transit Correlation (TC) and Job Flow Correlation (JFC),
+ * and then merge correlated MapReduce-jobs (MR-jobs) into one MR-job.
+ *
+ * For the definitions of correlations, see the original paper of YSmart.
+ *
+ * Rules for merging correlated MR-jobs implemented in this correlation
+ * optimizer are:
+ * 1. If an MR-job for a Join operation has the same partitioning keys with its all
+ * preceding MR-jobs, correlation optimizer merges these MR-jobs into one MR-job.
+ * 2. If an MR-job for a GroupBy and Aggregation operation has the same partitioning keys
+ * with its preceding MR-job, correlation optimizer merges these two MR-jobs into one MR-job.
+ *
+ * Note: In the current implementation, if correlation optimizer detects MR-jobs of a sub-plan tree
+ * are correlated, it transforms this sub-plan tree to a single MR-job when the input of this
+ * sub-plan tree is not a temporary table. Otherwise, the current implementation will ignore this
+ * sub-plan tree.
+ *
+ * There are several future work that will enhance the correlation optimizer.
+ * Here are four examples:
+ * 1. Add a new rule that is if two MR-jobs share the same
+ * partitioning keys and they have common input tables, merge these two MR-jobs into a single
+ * MR-job.
+ * 2. The current implementation detects MR-jobs which have the same partitioning keys
+ * as correlated MR-jobs. However, the condition of same partitioning keys can be relaxed to use
+ * common partitioning keys.
+ * 3. The current implementation cannot optimize MR-jobs for the
+ * aggregation functions with a distinct keyword, which should be supported in the future
+ * implementation of the correlation optimizer.
+ * 4. Optimize queries involving self-join.
+ */
+
+public class CorrelationOptimizer implements Transform {
+
+ static final private Log LOG = LogFactory.getLog(CorrelationOptimizer.class.getName());
+ private final Map aliastoTabName;
+ private final Map aliastoTab;
+
+ public CorrelationOptimizer() {
+ super();
+ aliastoTabName = new HashMap();
+ aliastoTab = new HashMap();
+ pGraphContext = null;
+ }
+
+ private boolean initializeAliastoTabNameMapping(QB qb) {
+ // If any sub-query's qb is null, CorrelationOptimizer will not optimize this query.
+ // e.g. auto_join27.q
+ if (qb == null) {
+ return false;
+ }
+ boolean ret = true;
+ for (String alias : qb.getAliases()) {
+ aliastoTabName.put(alias, qb.getTabNameForAlias(alias));
+ aliastoTab.put(alias, qb.getMetaData().getSrcForAlias(alias));
+ }
+ for (String subqalias : qb.getSubqAliases()) {
+ QBExpr qbexpr = qb.getSubqForAlias(subqalias);
+ ret = ret && initializeAliastoTabNameMapping(qbexpr.getQB());
+ }
+ return ret;
+ }
+
+ protected ParseContext pGraphContext;
+ private LinkedHashMap, OpParseContext> opParseCtx;
+
+ private boolean abort = false;
+
+ private Map groupbyNonMapSide2MapSide;
+ private Map groupbyMapSide2NonMapSide;
+
+ /**
+ * Transform the query tree.
+ *
+ * @param pactx
+ * current parse context
+ * @throws SemanticException
+ */
+ public ParseContext transform(ParseContext pctx) throws SemanticException {
+
+ pGraphContext = pctx;
+ opParseCtx = pctx.getOpParseCtx();
+
+ groupbyNonMapSide2MapSide = pctx.getGroupbyNonMapSide2MapSide();
+ groupbyMapSide2NonMapSide = pctx.getGroupbyMapSide2NonMapSide();
+
+ QB qb = pGraphContext.getQB();
+ abort = !initializeAliastoTabNameMapping(qb);
+ if (abort) {
+ LOG.info("Abort. Reasons are ...");
+ LOG.info("-- This query or its sub-queries has a null qb.");
+ return pGraphContext;
+ }
+
+ // 0: Replace all map-side group by pattern (GBY-RS-GBY) to
+ // non-map-side group by pattern (RS-GBY) if necessary
+ if (pGraphContext.getConf().getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) {
+ for (Entry entry : groupbyMapSide2NonMapSide.entrySet()) {
+ GroupByOperator mapSidePatternStart = entry.getKey();
+ Operator extends OperatorDesc> op1 = mapSidePatternStart.getChildOperators().get(0);
+ Operator extends OperatorDesc> op2 = op1.getChildOperators().get(0);
+ if (!(op1 instanceof ReduceSinkOperator && op2 instanceof GroupByOperator)) {
+ LOG.info("Abort. Reasons are ...");
+ LOG.info("-- This plan has been converted to a plan involving map-only groupby");
+ // e.g. test query groupby_sort_1.q, which is introduced in HIVE-3432.
+ return pGraphContext;
+ }
+ }
+
+ for (Entry entry : groupbyMapSide2NonMapSide.entrySet()) {
+ GroupByOperator mapSidePatternStart = entry.getKey();
+ GroupByOperator mapSidePatternEnd = (GroupByOperator) mapSidePatternStart
+ .getChildOperators().get(0).getChildOperators().get(0);
+ ReduceSinkOperator nonMapSidePatternStart = entry.getValue();
+ GroupByOperator nonMapSidePatternEnd = (GroupByOperator) nonMapSidePatternStart
+ .getChildOperators().get(0);
+
+ List> parents = mapSidePatternStart.getParentOperators();
+ List> children = mapSidePatternEnd.getChildOperators();
+
+ nonMapSidePatternStart.setParentOperators(parents);
+ nonMapSidePatternEnd.setChildOperators(children);
+
+ for (Operator extends OperatorDesc> parent : parents) {
+ parent.replaceChild(mapSidePatternStart, nonMapSidePatternStart);
+ }
+ for (Operator extends OperatorDesc> child : children) {
+ child.replaceParent(mapSidePatternEnd, nonMapSidePatternEnd);
+ }
+ }
+ }
+
+ // 1: detect correlations
+ CorrelationNodeProcCtx correlationCtx = new CorrelationNodeProcCtx();
+
+ Map opRules = new LinkedHashMap();
+ opRules.put(new RuleRegExp("R1", ReduceSinkOperator.getOperatorName() + "%"),
+ new CorrelationNodeProc());
+
+ Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, correlationCtx);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+
+ // Create a list of topOp nodes
+ List topNodes = new ArrayList();
+ topNodes.addAll(pGraphContext.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+ abort = correlationCtx.isAbort();
+ int correlationsAppliedCount = 0;
+ if (abort) {
+ LOG.info("Abort. Reasons are ...");
+ for (String reason : correlationCtx.getAbortReasons()) {
+ LOG.info("-- " + reason);
+ }
+ } else {
+ // 2: transform the query plan tree
+ LOG.info("Begain query plan transformation based on intra-query correlations. " +
+ correlationCtx.getCorrelations().size() + " correlation(s) to be applied");
+ for (IntraQueryCorrelation correlation : correlationCtx.getCorrelations()) {
+ boolean ret = CorrelationOptimizerUtils.applyCorrelation(
+ correlation, pGraphContext, groupbyNonMapSide2MapSide, opParseCtx);
+ if (ret) {
+ correlationsAppliedCount++;
+ }
+ }
+ }
+
+ // 3: if no correlation applied, replace all non-map-side group by pattern (GBY-RS-GBY) to
+ // map-side group by pattern (RS-GBY) if necessary
+ if (correlationsAppliedCount == 0 &&
+ pGraphContext.getConf().getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) {
+ for (Entry entry : groupbyNonMapSide2MapSide.entrySet()) {
+ GroupByOperator mapSidePatternStart = entry.getValue();
+ GroupByOperator mapSidePatternEnd = (GroupByOperator) mapSidePatternStart
+ .getChildOperators().get(0).getChildOperators().get(0);
+ ReduceSinkOperator nonMapSidePatternStart = entry.getKey();
+ GroupByOperator nonMapSidePatternEnd = (GroupByOperator) nonMapSidePatternStart
+ .getChildOperators().get(0);
+
+ List> parents = nonMapSidePatternStart
+ .getParentOperators();
+ List> children = nonMapSidePatternEnd.getChildOperators();
+
+ mapSidePatternStart.setParentOperators(parents);
+ mapSidePatternEnd.setChildOperators(children);
+
+ for (Operator extends OperatorDesc> parent : parents) {
+ parent.replaceChild(nonMapSidePatternStart, mapSidePatternStart);
+ }
+ for (Operator extends OperatorDesc> child : children) {
+ child.replaceParent(nonMapSidePatternEnd, mapSidePatternEnd);
+ }
+ }
+ }
+ LOG.info("Finish query plan transformation based on intra-query correlations. " +
+ correlationsAppliedCount + " correlation(s) actually be applied");
+ return pGraphContext;
+ }
+
+ private class CorrelationNodeProc implements NodeProcessor {
+
+
+ /**
+ * Find all upstream (close to FileSinkOperator) ReduceSinkOperators starting from
+ * input Operators
+ *
+ * @param ops
+ * Operators starting the search
+ * @return
+ */
+ public List> findUpstreamReduceSinkOperators(
+ List> ops) {
+ List> downstreamReduceSinkOperatos =
+ new ArrayList>();
+ for (Operator extends OperatorDesc> op : ops) {
+ if (op.getName().equals(ReduceSinkOperator.getOperatorName())) {
+ downstreamReduceSinkOperatos.add(op);
+ } else if (op.getName().equals(FileSinkOperator.getOperatorName())) {
+ continue;
+ } else {
+ downstreamReduceSinkOperatos.addAll(findUpstreamReduceSinkOperators(
+ op.getChildOperators()));
+ }
+ }
+
+ return downstreamReduceSinkOperatos;
+ }
+
+ private void analyzeReduceSinkOperatorsOfJoinOperator(JoinCondDesc[] joinConds,
+ List> rsOps, Operator extends OperatorDesc> curentRsOps,
+ Set correlatedRsOps) {
+ if (correlatedRsOps.contains((ReduceSinkOperator) curentRsOps)) {
+ return;
+ }
+
+ correlatedRsOps.add((ReduceSinkOperator) curentRsOps);
+
+ int pos = rsOps.indexOf(curentRsOps);
+ for (int i = 0; i < joinConds.length; i++) {
+ JoinCondDesc joinCond = joinConds[i];
+ int type = joinCond.getType();
+ if (pos == joinCond.getLeft()) {
+ if (type == JoinDesc.INNER_JOIN || type == JoinDesc.LEFT_OUTER_JOIN) {
+ Operator extends OperatorDesc> newCurrentRsOps = rsOps.get(joinCond.getRight());
+ analyzeReduceSinkOperatorsOfJoinOperator(joinConds, rsOps, newCurrentRsOps,
+ correlatedRsOps);
+ }
+ } else if (pos == joinCond.getRight()) {
+ if (type == JoinDesc.INNER_JOIN || type == JoinDesc.RIGHT_OUTER_JOIN) {
+ Operator extends OperatorDesc> newCurrentRsOps = rsOps.get(joinCond.getLeft());
+ analyzeReduceSinkOperatorsOfJoinOperator(joinConds, rsOps, newCurrentRsOps,
+ correlatedRsOps);
+ }
+ }
+ }
+ }
+
+ private Set findCorrelatedReduceSinkOperators(
+ Operator extends OperatorDesc> op, Set keyColumns,
+ IntraQueryCorrelation correlation) throws SemanticException {
+
+ LOG.info("now detecting operator " + op.getIdentifier() + " " + op.getName());
+
+ Set correlatedReduceSinkOperators = new HashSet();
+ if (op.getParentOperators() == null) {
+ return correlatedReduceSinkOperators;
+ }
+ if (op.getColumnExprMap() == null && !(op instanceof ReduceSinkOperator)) {
+ for (Operator extends OperatorDesc> parent : op.getParentOperators()) {
+ correlatedReduceSinkOperators.addAll(findCorrelatedReduceSinkOperators(
+ parent, keyColumns, correlation));
+ }
+ } else if (op.getColumnExprMap() != null && !(op instanceof ReduceSinkOperator)) {
+ Set newKeyColumns = new HashSet();
+ for (String keyColumn : keyColumns) {
+ ExprNodeDesc col = op.getColumnExprMap().get(keyColumn);
+ if (col instanceof ExprNodeColumnDesc) {
+ newKeyColumns.add(((ExprNodeColumnDesc) col).getColumn());
+ }
+ }
+
+ if (op.getName().equals(CommonJoinOperator.getOperatorName())) {
+ Set tableNeedToCheck = new HashSet();
+ for (String keyColumn : keyColumns) {
+ for (ColumnInfo cinfo : opParseCtx.get(op).getRowResolver().getColumnInfos()) {
+ if (keyColumn.equals(cinfo.getInternalName())) {
+ tableNeedToCheck.add(cinfo.getTabAlias());
+ }
+ }
+ }
+ Set correlatedRsOps = new HashSet();
+ for (Operator extends OperatorDesc> parent : op.getParentOperators()) {
+ Set tableNames =
+ opParseCtx.get(parent).getRowResolver().getTableNames();
+ for (String tbl : tableNames) {
+ if (tableNeedToCheck.contains(tbl)) {
+ correlatedRsOps.addAll(findCorrelatedReduceSinkOperators(parent,
+ newKeyColumns, correlation));
+ }
+ }
+ }
+
+ // Right now, if any ReduceSinkOperator of this JoinOperator is not correlated, we will
+ // not optimize this query
+ if (correlatedRsOps.size() == op.getParentOperators().size()) {
+ correlatedReduceSinkOperators.addAll(correlatedRsOps);
+ } else {
+ correlatedReduceSinkOperators.clear();
+ }
+ } else {
+ for (Operator extends OperatorDesc> parent : op.getParentOperators()) {
+ correlatedReduceSinkOperators.addAll(findCorrelatedReduceSinkOperators(
+ parent, newKeyColumns, correlation));
+ }
+ }
+ } else if (op.getColumnExprMap() != null && op instanceof ReduceSinkOperator) {
+ Set newKeyColumns = new HashSet();
+ for (String keyColumn : keyColumns) {
+ ExprNodeDesc col = op.getColumnExprMap().get(keyColumn);
+ if (col instanceof ExprNodeColumnDesc) {
+ newKeyColumns.add(((ExprNodeColumnDesc) col).getColumn());
+ }
+ }
+
+ ReduceSinkOperator rsop = (ReduceSinkOperator) op;
+ Set thisKeyColumns = new HashSet();
+ for (ExprNodeDesc key : rsop.getConf().getKeyCols()) {
+ if (key instanceof ExprNodeColumnDesc) {
+ thisKeyColumns.add(((ExprNodeColumnDesc) key).getColumn());
+ }
+ }
+
+ boolean isCorrelated = false;
+ Set intersection = new HashSet(newKeyColumns);
+ intersection.retainAll(thisKeyColumns);
+ // TODO: relax the condition to handle more cases
+ isCorrelated = (!intersection.isEmpty() &&
+ intersection.size() == thisKeyColumns.size() &&
+ intersection.size() == newKeyColumns.size());
+
+
+ if (isCorrelated) {
+ List> upstreamReduceSinkOperators =
+ findUpstreamReduceSinkOperators(rsop.getChildOperators());
+ // downstreamReduceSinkOperators will not be empty because rsop is not a
+ // ReduceSinkOperator which is nearest to FileSinkOperator
+ assert upstreamReduceSinkOperators.size() != 0;
+ for (Operator extends OperatorDesc> dsRSop : upstreamReduceSinkOperators) {
+ assert dsRSop instanceof ReduceSinkOperator;
+ if (intersection.size() != ((ReduceSinkOperator) dsRSop).getConf().getKeyCols().size()) {
+ isCorrelated = false;
+ }
+ }
+ }
+
+ if (isCorrelated) {
+ LOG.info("Operator " + op.getIdentifier() + " " + op.getName() + " is correlated");
+ LOG.info("--keys of this operator: " + thisKeyColumns.toString());
+ LOG.info("--keys of child operator: " + keyColumns.toString());
+ LOG.info("--keys of child operator mapped to this operator:" + newKeyColumns.toString());
+ if (((Operator extends OperatorDesc>) (op.getChildOperators().get(0))).getName()
+ .equals(CommonJoinOperator.getOperatorName())) {
+ JoinOperator joinOp = (JoinOperator) op.getChildOperators().get(0);
+ JoinCondDesc[] joinConds = joinOp.getConf().getConds();
+ List> rsOps = joinOp.getParentOperators();
+ Set correlatedRsOps = new HashSet();
+ analyzeReduceSinkOperatorsOfJoinOperator(joinConds, rsOps, op, correlatedRsOps);
+ correlatedReduceSinkOperators.addAll(correlatedRsOps);
+ } else {
+ correlatedReduceSinkOperators.add(rsop);
+ }
+ } else {
+ LOG.info("Operator " + op.getIdentifier() + " " + op.getName() + " is not correlated");
+ LOG.info("--keys of this operator: " + thisKeyColumns.toString());
+ LOG.info("--keys of child operator: " + keyColumns.toString());
+ LOG.info("--keys of child operator mapped to this operator:" + newKeyColumns.toString());
+ correlatedReduceSinkOperators.clear();
+ }
+ } else {
+ LOG.error("ReduceSinkOperator " + op.getIdentifier() + " does not have ColumnExprMap");
+ throw new SemanticException("CorrelationOptimizer cannot optimize this plan. " +
+ "ReduceSinkOperator " + op.getIdentifier()
+ + " does not have ColumnExprMap");
+ }
+ return correlatedReduceSinkOperators;
+ }
+
+ private Set exploitJFC(ReduceSinkOperator op,
+ CorrelationNodeProcCtx correlationCtx, IntraQueryCorrelation correlation)
+ throws SemanticException {
+
+ correlationCtx.addWalked(op);
+ correlation.addToAllReduceSinkOperators(op);
+
+ Set reduceSinkOperators = new HashSet();
+
+ boolean shouldDetect = true;
+
+ List keys = op.getConf().getKeyCols();
+ Set keyColumns = new HashSet();
+ for (ExprNodeDesc key : keys) {
+ if (!(key instanceof ExprNodeColumnDesc)) {
+ shouldDetect = false;
+ } else {
+ keyColumns.add(((ExprNodeColumnDesc) key).getColumn());
+ }
+ }
+
+ if (shouldDetect) {
+ Set newReduceSinkOperators = new HashSet();
+ for (Operator extends OperatorDesc> parent : op.getParentOperators()) {
+ LOG.info("Operator " + op.getIdentifier()
+ + ": start detecting correlation from this operator");
+ LOG.info("--keys of this operator: " + keyColumns.toString());
+ Set correlatedReduceSinkOperators =
+ findCorrelatedReduceSinkOperators(parent, keyColumns, correlation);
+ if (correlatedReduceSinkOperators.size() == 0) {
+ newReduceSinkOperators.add(op);
+ } else {
+ for (ReduceSinkOperator rsop : correlatedReduceSinkOperators) {
+
+ // For two ReduceSinkOperators, we say the one closer to FileSinkOperators is up and
+ // another one is down
+
+ if (!correlation.getUpstreamToDownstreamRSops().containsKey(op)) {
+ correlation.getUpstreamToDownstreamRSops().put(op,
+ new ArrayList());
+ }
+ correlation.getUpstreamToDownstreamRSops().get(op).add(rsop);
+
+ if (!correlation.getDownstreamToUpStreamRSops().containsKey(rsop)) {
+ correlation.getDownstreamToUpStreamRSops().put(rsop,
+ new ArrayList());
+ }
+ correlation.getDownstreamToUpStreamRSops().get(rsop).add(op);
+ Set exploited = exploitJFC(rsop, correlationCtx,
+ correlation);
+ if (exploited.size() == 0) {
+ newReduceSinkOperators.add(rsop);
+ } else {
+ newReduceSinkOperators.addAll(exploited);
+ }
+ }
+ }
+ }
+ reduceSinkOperators.addAll(newReduceSinkOperators);
+ }
+ return reduceSinkOperators;
+ }
+
+ private TableScanOperator findTableScanOPerator(Operator extends OperatorDesc> startPoint) {
+ Operator extends OperatorDesc> thisOp = startPoint.getParentOperators().get(0);
+ while (true) {
+ if (thisOp.getName().equals(ReduceSinkOperator.getOperatorName())) {
+ return null;
+ } else if (thisOp.getName().equals(TableScanOperator.getOperatorName())) {
+ return (TableScanOperator) thisOp;
+ } else {
+ if (thisOp.getParentOperators() != null) {
+ thisOp = thisOp.getParentOperators().get(0);
+ } else {
+ break;
+ }
+ }
+ }
+ return null;
+ }
+
+ private void annotateOpPlan(IntraQueryCorrelation correlation) {
+ Map bottomReduceSink2OperationPath =
+ new HashMap();
+ int indx = 0;
+ for (ReduceSinkOperator rsop : correlation.getBottomReduceSinkOperators()) {
+ if (!bottomReduceSink2OperationPath.containsKey(rsop)) {
+ bottomReduceSink2OperationPath.put(rsop, indx);
+ for (ReduceSinkOperator peerRSop : CorrelationOptimizerUtils
+ .findPeerReduceSinkOperators(rsop)) {
+ if (correlation.getBottomReduceSinkOperators().contains(peerRSop)) {
+ bottomReduceSink2OperationPath.put(peerRSop, indx);
+ }
+ }
+ indx++;
+ }
+ }
+ correlation.setBottomReduceSink2OperationPathMap(bottomReduceSink2OperationPath);
+ }
+
+ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx,
+ Object... nodeOutputs) throws SemanticException {
+ CorrelationNodeProcCtx correlationCtx = (CorrelationNodeProcCtx) ctx;
+ ReduceSinkOperator op = (ReduceSinkOperator) nd;
+
+ if (correlationCtx.isWalked(op)) {
+ return null;
+ }
+
+ LOG.info("Walk to operator " + ((Operator) nd).getIdentifier() + " "
+ + ((Operator) nd).getName());
+
+ if (op.getConf().getKeyCols().size() == 0 ||
+ (!op.getChildOperators().get(0).getName().equals(CommonJoinOperator.getOperatorName()) &&
+ !op.getChildOperators().get(0).getName().equals(GroupByOperator.getOperatorName()))) {
+ correlationCtx.addWalked(op);
+ return null;
+ }
+
+ // 1: find out correlation
+ IntraQueryCorrelation correlation = new IntraQueryCorrelation();
+ List peerReduceSinkOperators =
+ CorrelationOptimizerUtils.findPeerReduceSinkOperators(op);
+ List bottomReduceSinkOperators = new ArrayList();
+ for (ReduceSinkOperator rsop : peerReduceSinkOperators) {
+ Set thisBottomReduceSinkOperators = exploitJFC(rsop,
+ correlationCtx, correlation);
+ if (thisBottomReduceSinkOperators.size() == 0) {
+ thisBottomReduceSinkOperators.add(rsop);
+ } else {
+ boolean isClear = false;
+ // bottom ReduceSinkOperators are those ReduceSinkOperators which are close to
+ // TableScanOperators
+ for (ReduceSinkOperator bottomRSop : thisBottomReduceSinkOperators) {
+ TableScanOperator tsop = findTableScanOPerator(bottomRSop);
+ if (tsop == null) {
+ isClear = true; // currently the optimizer can only optimize correlations involving
+ // source tables (input tables)
+ } else {
+ // Top ReduceSinkOperators are those ReduceSinkOperators which are close to
+ // FileSinkOperators
+ if (!correlation.getTopRSopToTSops().containsKey(rsop)) {
+ correlation.getTopRSopToTSops().put(rsop, new ArrayList());
+ }
+ correlation.getTopRSopToTSops().get(rsop).add(tsop);
+
+ if (!correlation.getBottomRSopToTSops().containsKey(bottomRSop)) {
+ correlation.getBottomRSopToTSops().put(bottomRSop,
+ new ArrayList());
+ }
+ correlation.getBottomRSopToTSops().get(bottomRSop).add(tsop);
+ }
+ }
+ if (isClear) {
+ thisBottomReduceSinkOperators.clear();
+ thisBottomReduceSinkOperators.add(rsop);
+ }
+ }
+ bottomReduceSinkOperators.addAll(thisBottomReduceSinkOperators);
+ }
+
+ if (!peerReduceSinkOperators.containsAll(bottomReduceSinkOperators)) {
+ LOG.info("has job flow correlation");
+ correlation.setJobFlowCorrelation(true);
+ correlation.setJFCCorrelation(peerReduceSinkOperators, bottomReduceSinkOperators);
+ annotateOpPlan(correlation);
+ }
+
+ if (correlation.hasJobFlowCorrelation()) {
+ boolean hasICandTC = findICandTC(correlation);
+ LOG.info("has input correlation and transit correlation? " + hasICandTC);
+ correlation.setInputCorrelation(hasICandTC);
+ correlation.setTransitCorrelation(hasICandTC);
+ boolean hasSelfJoin = hasSelfJoin(correlation);
+ LOG.info("has self-join? " + hasSelfJoin);
+ correlation.setInvolveSelfJoin(hasSelfJoin);
+ // TODO: support cases involving self-join. For self-join related operation paths, after the
+ // correlation dispatch operator, each path should be filtered by a filter operator
+ if (!hasSelfJoin) {
+ LOG.info("correlation detected");
+ correlationCtx.addCorrelation(correlation);
+ } else {
+ LOG.info("correlation discarded. The current optimizer cannot optimize self-join");
+ }
+ }
+ correlationCtx.addWalkedAll(peerReduceSinkOperators);
+ return null;
+ }
+
+ private boolean hasSelfJoin(IntraQueryCorrelation correlation) {
+ boolean hasSelfJoin = false;
+ for (Entry> entry : correlation
+ .getTableToCorrelatedRSops().entrySet()) {
+ for (ReduceSinkOperator rsop : entry.getValue()) {
+ Set intersection = new HashSet(
+ CorrelationOptimizerUtils.findPeerReduceSinkOperators(rsop));
+ intersection.retainAll(entry.getValue());
+ // if self-join is involved
+ if (intersection.size() > 1) {
+ hasSelfJoin = true;
+ return hasSelfJoin;
+ }
+ }
+ }
+ return hasSelfJoin;
+ }
+
+ private boolean findICandTC(IntraQueryCorrelation correlation) {
+
+ boolean hasICandTC = false;
+ Map> table2RSops =
+ new HashMap>();
+ Map> table2TSops =
+ new HashMap>();
+
+ for (Entry> entry : correlation
+ .getBottomRSopToTSops().entrySet()) {
+ String tbl = aliastoTabName.get(entry.getValue().get(0).getConf().getAlias());
+ if (!table2RSops.containsKey(tbl) && !table2TSops.containsKey(tbl)) {
+ table2RSops.put(tbl, new ArrayList());
+ table2TSops.put(tbl, new ArrayList());
+ }
+ assert entry.getValue().size() == 1;
+ table2RSops.get(tbl).add(entry.getKey());
+ table2TSops.get(tbl).add(entry.getValue().get(0));
+ }
+
+ for (Entry> entry : table2RSops.entrySet()) {
+ if (entry.getValue().size() > 1) {
+ hasICandTC = true;
+ break;
+ }
+ }
+ correlation.setICandTCCorrelation(table2RSops, table2TSops);
+ return hasICandTC;
+ }
+ }
+
+ private NodeProcessor getDefaultProc() {
+ return new NodeProcessor() {
+ @Override
+ public Object process(Node nd, Stack stack,
+ NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException {
+ LOG.info("Walk to operator " + ((Operator) nd).getIdentifier() + " "
+ + ((Operator) nd).getName() + ". No actual work to do");
+ CorrelationNodeProcCtx correlationCtx = (CorrelationNodeProcCtx) ctx;
+ Operator extends OperatorDesc> op = (Operator extends OperatorDesc>) nd;
+ if (op.getName().equals(MapJoinOperator.getOperatorName())) {
+ correlationCtx.setAbort(true);
+ correlationCtx.getAbortReasons().add("Found MAPJOIN");
+ }
+ if (op.getName().equals(FileSinkOperator.getOperatorName())) {
+ correlationCtx.incrementFileSinkOperatorCount();
+ }
+ return null;
+ }
+ };
+ }
+
+ public class IntraQueryCorrelation {
+
+ private final Map> downstreamRSopToUpstreamRSops =
+ new HashMap>();
+ private final Map> upstreamToDownstreamRSops =
+ new HashMap>();
+
+ private final Map> topRSopToTSops =
+ new HashMap>();
+ private final Map> bottomRSopToTSops =
+ new HashMap>();
+
+ private List topReduceSinkOperators;
+ private List bottomReduceSinkOperators;
+
+ private Map> tableToCorrelatedRSops;
+
+ private Map> tableToCorrelatedTSops;
+
+ private Map bottomReduceSink2OperationPathMap;
+
+ private final Map>> dispatchConf =
+ new HashMap>>(); // inputTag->(Child->outputTag)
+ private final Map>> dispatchValueSelectDescConf =
+ new HashMap>>(); // inputTag->(Child->SelectDesc)
+ private final Map>> dispatchKeySelectDescConf =
+ new HashMap>>(); // inputTag->(Child->SelectDesc)
+
+ private final Set allReduceSinkOperators =
+ new HashSet();
+
+ public void addToAllReduceSinkOperators(ReduceSinkOperator rsop) {
+ allReduceSinkOperators.add(rsop);
+ }
+
+ public Set getAllReduceSinkOperators() {
+ return allReduceSinkOperators;
+ }
+
+ public Map>> getDispatchConf() {
+ return dispatchConf;
+ }
+
+ public Map>> getDispatchValueSelectDescConf() {
+ return dispatchValueSelectDescConf;
+ }
+
+ public Map>> getDispatchKeySelectDescConf() {
+ return dispatchKeySelectDescConf;
+ }
+
+ public void addOperationPathToDispatchConf(Integer opPlan) {
+ if (!dispatchConf.containsKey(opPlan)) {
+ dispatchConf.put(opPlan, new HashMap>());
+ }
+ }
+
+ public Map> getDispatchConfForOperationPath(Integer opPlan) {
+ return dispatchConf.get(opPlan);
+ }
+
+ public void addOperationPathToDispatchValueSelectDescConf(Integer opPlan) {
+ if (!dispatchValueSelectDescConf.containsKey(opPlan)) {
+ dispatchValueSelectDescConf.put(opPlan, new HashMap>());
+ }
+ }
+
+ public Map> getDispatchValueSelectDescConfForOperationPath(
+ Integer opPlan) {
+ return dispatchValueSelectDescConf.get(opPlan);
+ }
+
+ public void addOperationPathToDispatchKeySelectDescConf(Integer opPlan) {
+ if (!dispatchKeySelectDescConf.containsKey(opPlan)) {
+ dispatchKeySelectDescConf.put(opPlan, new HashMap>());
+ }
+ }
+
+ public Map> getDispatchKeySelectDescConfForOperationPath(
+ Integer opPlan) {
+ return dispatchKeySelectDescConf.get(opPlan);
+ }
+
+ private boolean inputCorrelation = false;
+ private boolean transitCorrelation = false;
+ private boolean jobFlowCorrelation = false;
+
+ public void setBottomReduceSink2OperationPathMap(
+ Map bottomReduceSink2OperationPathMap) {
+ this.bottomReduceSink2OperationPathMap = bottomReduceSink2OperationPathMap;
+ }
+
+ public Map getBottomReduceSink2OperationPathMap() {
+ return bottomReduceSink2OperationPathMap;
+ }
+
+ public void setInputCorrelation(boolean inputCorrelation) {
+ this.inputCorrelation = inputCorrelation;
+ }
+
+ public boolean hasInputCorrelation() {
+ return inputCorrelation;
+ }
+
+ public void setTransitCorrelation(boolean transitCorrelation) {
+ this.transitCorrelation = transitCorrelation;
+ }
+
+ public boolean hasTransitCorrelation() {
+ return transitCorrelation;
+ }
+
+ public void setJobFlowCorrelation(boolean jobFlowCorrelation) {
+ this.jobFlowCorrelation = jobFlowCorrelation;
+ }
+
+ public boolean hasJobFlowCorrelation() {
+ return jobFlowCorrelation;
+ }
+
+ public Map> getTopRSopToTSops() {
+ return topRSopToTSops;
+ }
+
+ public Map> getBottomRSopToTSops() {
+ return bottomRSopToTSops;
+ }
+
+ public Map> getDownstreamToUpStreamRSops() {
+ return downstreamRSopToUpstreamRSops;
+ }
+
+ public Map> getUpstreamToDownstreamRSops() {
+ return upstreamToDownstreamRSops;
+ }
+
+ public void setJFCCorrelation(List peerReduceSinkOperators,
+ List bottomReduceSinkOperators) {
+ this.topReduceSinkOperators = peerReduceSinkOperators;
+ this.bottomReduceSinkOperators = bottomReduceSinkOperators;
+ }
+
+ public List getTopReduceSinkOperators() {
+ return topReduceSinkOperators;
+ }
+
+ public List getBottomReduceSinkOperators() {
+ return bottomReduceSinkOperators;
+ }
+
+ public void setICandTCCorrelation(Map> tableToRSops,
+ Map> tableToTSops) {
+ this.tableToCorrelatedRSops = tableToRSops;
+ this.tableToCorrelatedTSops = tableToTSops;
+ }
+
+ public Map> getTableToCorrelatedRSops() {
+ return tableToCorrelatedRSops;
+ }
+
+ public Map> getTableToCorrelatedTSops() {
+ return tableToCorrelatedTSops;
+ }
+
+ private boolean isInvolveSelfJoin = false;
+
+ public boolean isInvolveSelfJoin() {
+ return isInvolveSelfJoin;
+ }
+
+ public void setInvolveSelfJoin(boolean isInvolveSelfJoin) {
+ this.isInvolveSelfJoin = isInvolveSelfJoin;
+ }
+
+ }
+
+ private class CorrelationNodeProcCtx implements NodeProcessorCtx {
+
+ private boolean abort;
+
+ private final List abortReasons;
+
+ private final Set walked;
+
+ private final List correlations;
+
+ private int fileSinkOperatorCount;
+
+ public CorrelationNodeProcCtx() {
+ walked = new HashSet();
+ correlations = new ArrayList();
+ abort = false;
+ abortReasons = new ArrayList();
+ fileSinkOperatorCount = 0;
+ }
+
+ public void setAbort(boolean abort) {
+ this.abort = abort;
+ }
+
+ public boolean isAbort() {
+ return abort;
+ }
+
+ public List getAbortReasons() {
+ return abortReasons;
+ }
+
+ public void addCorrelation(IntraQueryCorrelation correlation) {
+ correlations.add(correlation);
+ }
+
+ public List getCorrelations() {
+ return correlations;
+ }
+
+ public boolean isWalked(ReduceSinkOperator op) {
+ return walked.contains(op);
+ }
+
+ public void addWalked(ReduceSinkOperator op) {
+ walked.add(op);
+ }
+
+ public void addWalkedAll(Collection c) {
+ walked.addAll(c);
+ }
+
+ public void incrementFileSinkOperatorCount() {
+ fileSinkOperatorCount++;
+ if (fileSinkOperatorCount == 2) {
+ abort = true;
+ abortReasons
+ .add("-- Currently, a query with multiple FileSinkOperators are not supported.");
+ }
+ }
+
+ }
+
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java
new file mode 100644
index 0000000..09ebb51
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java
@@ -0,0 +1,772 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.CorrelationCompositeOperator;
+import org.apache.hadoop.hive.ql.exec.CorrelationLocalSimulativeReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.CorrelationOptimizer.IntraQueryCorrelation;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
+import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationLocalSimulativeReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.ForwardDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+
+
+public final class CorrelationOptimizerUtils {
+
+ static final private Log LOG = LogFactory.getLog(CorrelationOptimizerUtils.class.getName());
+
+ public static boolean isExisted(ExprNodeDesc expr, List col_list) {
+ for (ExprNodeDesc thisExpr : col_list) {
+ if (expr.getExprString().equals(thisExpr.getExprString())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static String getColumnName(Map opColumnExprMap, ExprNodeDesc expr) {
+ for (Entry entry : opColumnExprMap.entrySet()) {
+ if (expr.getExprString().equals(entry.getValue().getExprString())) {
+ return entry.getKey();
+ }
+ }
+ return null;
+ }
+
+
+ public static Operator extends OperatorDesc> unionUsedColumnsAndMakeNewSelect(
+ List rsops, IntraQueryCorrelation correlation,
+ TableScanOperator input,
+ LinkedHashMap, OpParseContext> originalOpParseCtx)
+ throws SemanticException {
+
+ ArrayList columnNames = new ArrayList();
+ Map colExprMap = new HashMap();
+ ArrayList col_list = new ArrayList();
+ RowResolver out_rwsch = new RowResolver();
+ boolean isSelectAll = false;
+
+ int pos = 0;
+ for (ReduceSinkOperator rsop : rsops) {
+ List tsops = correlation.getBottomRSopToTSops().get(rsop);
+ // bottom ReduceSinkOperaotr should only have 1 corresponding TableScanOperator.
+ assert tsops.size() == 1;
+ Queue> opToBeVisited =
+ new LinkedList>(tsops.get(0).getChildOperators());
+
+ Operator extends OperatorDesc> curr = opToBeVisited.poll();
+ assert curr != null;
+ while (curr != null) {
+ if (curr.getName().equals(SelectOperator.getOperatorName())) {
+ SelectOperator selOp = (SelectOperator) curr;
+ if (selOp.getColumnExprMap() != null) {
+ for (Entry entry : selOp.getColumnExprMap().entrySet()) {
+ ExprNodeDesc expr = entry.getValue();
+ if (!isExisted(expr, col_list)) {
+ String outputName = entry.getKey();
+ String[] colRef = originalOpParseCtx.get(selOp).getRowResolver().
+ reverseLookup(outputName);
+ if (colRef == null) {
+ continue;
+ }
+ col_list.add(expr);
+ String tabAlias = colRef[0];
+ String colAlias = colRef[1];
+ out_rwsch.put(tabAlias, colAlias, new ColumnInfo(
+ outputName, expr.getTypeInfo(), tabAlias, false));
+ pos++;
+ columnNames.add(outputName);
+ colExprMap.put(outputName, expr);
+ }
+ }
+ } else {
+ for (ExprNodeDesc expr : selOp.getConf().getColList()) {
+ if (!isExisted(expr, col_list)) {
+ String outputName = expr.getCols().get(0);
+ String[] colRef = originalOpParseCtx.get(selOp).getRowResolver()
+ .reverseLookup(outputName);
+ if (colRef == null) {
+ continue;
+ }
+ col_list.add(expr);
+ String tabAlias = colRef[0];
+ String colAlias = colRef[1];
+ out_rwsch.put(tabAlias, colAlias, new ColumnInfo(
+ outputName, expr.getTypeInfo(), tabAlias, false));
+ columnNames.add(outputName);
+ colExprMap.put(outputName, expr);
+ pos++;
+ }
+ }
+ }
+ break;
+ } else if (curr.getName().equals(FilterOperator.getOperatorName())) {
+ // reach FilterOperator before reaching SelectOperator or ReduceSinkOperaotr
+ isSelectAll = true;
+ break;
+ } else if (curr.getName().equals(ReduceSinkOperator.getOperatorName())) {
+ ReduceSinkOperator thisRSop = (ReduceSinkOperator) curr;
+ for (ExprNodeDesc expr : thisRSop.getConf().getKeyCols()) {
+ if (!isExisted(expr, col_list)) {
+ assert expr.getCols().size() == 1;
+ String columnName = getColumnName(thisRSop.getColumnExprMap(), expr);
+ String[] colRef = originalOpParseCtx.get(thisRSop).getRowResolver()
+ .reverseLookup(columnName);
+ if (colRef == null) {
+ continue;
+ }
+ col_list.add(expr);
+ String tabAlias = colRef[0];
+ String colAlias = colRef[1];
+ String outputName = expr.getCols().get(0);
+ out_rwsch.put(tabAlias, colAlias, new ColumnInfo(
+ outputName, expr.getTypeInfo(), tabAlias, false));
+ columnNames.add(outputName);
+ colExprMap.put(outputName, expr);
+ pos++;
+ }
+ }
+ for (ExprNodeDesc expr : thisRSop.getConf().getValueCols()) {
+ if (!isExisted(expr, col_list)) {
+ assert expr.getCols().size() == 1;
+ String columnName = getColumnName(thisRSop.getColumnExprMap(), expr);
+ String[] colRef = originalOpParseCtx.get(thisRSop).getRowResolver()
+ .reverseLookup(columnName);
+ if (colRef == null) {
+ continue;
+ }
+ col_list.add(expr);
+ String tabAlias = colRef[0];
+ String colAlias = colRef[1];
+ String outputName = expr.getCols().get(0);
+ out_rwsch.put(tabAlias, colAlias, new ColumnInfo(
+ outputName, expr.getTypeInfo(), tabAlias, false));
+ columnNames.add(outputName);
+ colExprMap.put(outputName, expr);
+ pos++;
+ }
+ }
+ break;
+ } else {
+ opToBeVisited.addAll(curr.getChildOperators());
+ }
+ curr = opToBeVisited.poll();
+ }
+ }
+
+ Operator extends OperatorDesc> output;
+ if (isSelectAll) {
+ output = input;
+ } else {
+ output = putOpInsertMap(OperatorFactory.getAndMakeChild(
+ new SelectDesc(col_list, columnNames, false), new RowSchema(
+ out_rwsch.getColumnInfos()), input), out_rwsch, originalOpParseCtx);
+ output.setColumnExprMap(colExprMap);
+ output.setChildOperators(Utilities.makeList());
+ }
+
+ return output;
+ }
+
+
+ public static Operator extends OperatorDesc> putOpInsertMap(
+ Operator extends OperatorDesc> op,
+ RowResolver rr, LinkedHashMap, OpParseContext> opParseCtx) {
+ OpParseContext ctx = new OpParseContext(rr);
+ opParseCtx.put(op, ctx);
+ op.augmentPlan();
+ return op;
+ }
+
+ public static Map, String> getAliasIDtTopOps(
+ Map> topOps) {
+ Map, String> aliasIDtTopOps =
+ new HashMap, String>();
+ for (Entry> entry : topOps.entrySet()) {
+ assert !aliasIDtTopOps.containsKey(entry.getValue());
+ aliasIDtTopOps.put(entry.getValue(), entry.getKey());
+ }
+ return aliasIDtTopOps;
+ }
+
+ /**
+ * Find all peer ReduceSinkOperators (which have the same child operator of op) of op (op
+ * included).
+ */
+ public static List