diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 5efae89..2dc3886 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -494,6 +494,7 @@ public class HiveConf extends Configuration {
HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join
HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join
HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true),
+ HIVEOPTCORRELATION("hive.optimize.correlation", true), // exploit intra-query correlations
// Indexes
HIVEOPTINDEXFILTER_COMPACT_MINSIZE("hive.optimize.index.filter.compact.minsize", (long) 5 * 1024 * 1024 * 1024), // 5G
diff --git conf/hive-site.xml conf/hive-site.xml
index dab494e..34ecab9 100644
--- conf/hive-site.xml
+++ conf/hive-site.xml
@@ -19,4 +19,9 @@
+
+ hive.optimize.correlation
+ true
+
+
diff --git data/conf/hive-site.xml data/conf/hive-site.xml
index 907d333..496a83c 100644
--- data/conf/hive-site.xml
+++ data/conf/hive-site.xml
@@ -176,4 +176,9 @@
The default input format, if it is not specified, the system assigns it. It is set to HiveInputFormat for hadoop versions 17, 18 and 19, whereas it is set to CombineHiveInputFormat for hadoop 20. The user can always overwrite it - if there is a bug in CombineHiveInputFormat, it can always be manually set to HiveInputFormat.
+
+ hive.optimize.correlation
+ true
+
+
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java
new file mode 100644
index 0000000..aa220a9
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * BaseReduceSinkOperator
+ **/
+public abstract class BaseReduceSinkOperator extends
+ TerminalOperator
+ implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ protected static final Log LOG = LogFactory.getLog(BaseReduceSinkOperator.class
+ .getName());
+
+ /**
+ * The evaluators for the key columns. Key columns decide the sort order on
+ * the reducer side. Key columns are passed to the reducer in the "key".
+ */
+ protected transient ExprNodeEvaluator[] keyEval;
+ /**
+ * The evaluators for the value columns. Value columns are passed to reducer
+ * in the "value".
+ */
+ protected transient ExprNodeEvaluator[] valueEval;
+ /**
+ * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
+ * Hive language). Partition columns decide the reducer that the current row
+ * goes to. Partition columns are not passed to reducer.
+ */
+ protected transient ExprNodeEvaluator[] partitionEval;
+
+ // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is
+ // ready
+ transient Serializer keySerializer;
+ transient boolean keyIsText;
+ transient Serializer valueSerializer;
+ transient int tag;
+ transient byte[] tagByte = new byte[1];
+ transient protected int numDistributionKeys;
+ transient protected int numDistinctExprs;
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+
+ try {
+ keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()];
+ int i = 0;
+ for (ExprNodeDesc e : conf.getKeyCols()) {
+ keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+
+ numDistributionKeys = conf.getNumDistributionKeys();
+ distinctColIndices = conf.getDistinctColumnIndices();
+ numDistinctExprs = distinctColIndices.size();
+
+ valueEval = new ExprNodeEvaluator[conf.getValueCols().size()];
+ i = 0;
+ for (ExprNodeDesc e : conf.getValueCols()) {
+ valueEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+
+ partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()];
+ i = 0;
+ for (ExprNodeDesc e : conf.getPartitionCols()) {
+ partitionEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+
+ tag = conf.getTag();
+ tagByte[0] = (byte) tag;
+ LOG.info("Using tag = " + tag);
+
+ TableDesc keyTableDesc = conf.getKeySerializeInfo();
+ keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
+ .newInstance();
+ keySerializer.initialize(null, keyTableDesc.getProperties());
+ keyIsText = keySerializer.getSerializedClass().equals(Text.class);
+
+ TableDesc valueTableDesc = conf.getValueSerializeInfo();
+ valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+ .newInstance();
+ valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+ firstRow = true;
+ initializeChildren(hconf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ transient InspectableObject tempInspectableObject = new InspectableObject();
+ transient HiveKey keyWritable = new HiveKey();
+ transient Writable value;
+
+ transient StructObjectInspector keyObjectInspector;
+ transient StructObjectInspector valueObjectInspector;
+ transient ObjectInspector[] partitionObjectInspectors;
+
+ transient Object[][] cachedKeys;
+ transient Object[] cachedValues;
+ transient List> distinctColIndices;
+
+ boolean firstRow;
+
+ transient Random random;
+
+ /**
+ * Initializes array of ExprNodeEvaluator. Adds Union field for distinct
+ * column indices for group by.
+ * Puts the return values into a StructObjectInspector with output column
+ * names.
+ *
+ * If distinctColIndices is empty, the object inspector is same as
+ * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)}
+ */
+ protected static StructObjectInspector initEvaluatorsAndReturnStruct(
+ ExprNodeEvaluator[] evals, List> distinctColIndices,
+ List outputColNames,
+ int length, ObjectInspector rowInspector)
+ throws HiveException {
+ int inspectorLen = evals.length > length ? length + 1 : evals.length;
+ List sois = new ArrayList(inspectorLen);
+
+ // keys
+ ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, 0, length, rowInspector);
+ sois.addAll(Arrays.asList(fieldObjectInspectors));
+
+ if (evals.length > length) {
+ // union keys
+ List uois = new ArrayList();
+ for (List distinctCols : distinctColIndices) {
+ List names = new ArrayList();
+ List eois = new ArrayList();
+ int numExprs = 0;
+ for (int i : distinctCols) {
+ names.add(HiveConf.getColumnInternalName(numExprs));
+ eois.add(evals[i].initialize(rowInspector));
+ numExprs++;
+ }
+ uois.add(ObjectInspectorFactory.getStandardStructObjectInspector(names, eois));
+ }
+ UnionObjectInspector uoi =
+ ObjectInspectorFactory.getStandardUnionObjectInspector(uois);
+ sois.add(uoi);
+ }
+ return ObjectInspectorFactory.getStandardStructObjectInspector(outputColNames, sois);
+ }
+
+ @Override
+ public abstract void processOp(Object row, int tag) throws HiveException;
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return "BaseReduceSink";
+ }
+
+ @Override
+ public OperatorType getType() {
+ return null;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java
new file mode 100644
index 0000000..13efa7f
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Correlation composite operator implementation. This operator is used only in map phase for
+ * sharing table scan. Suppose that there are multiple operation paths (e.g. two different
+ * predicates on a table ) that share a common table. A row will be processed by these operation
+ * paths. To tag which operation paths actually forward this row, CorrelationCompositeOperator is
+ * used. For a row, this operator will buffer forwarded rows from its parents and then tag this row
+ * with a operation path tag indicating which paths forwarded this row. Right now, since operation
+ * path tag used in ReduceSinkOperator has 1 byte, this operator can have at most 8 parents
+ * (operation paths). For example, suppose that the common table is T and predicates P1 and P2 will
+ * be used in sub-queries SQ1 and SQ2, respectively. The CorrelationCompositeOperator
+ * will apply P1 and P2 on the row and tag the record based on if P1 or P2 is true.
+ **/
+public class CorrelationCompositeOperator extends Operator implements
+Serializable {
+
+ static final private Log LOG = LogFactory.getLog(Driver.class.getName());
+
+ public static enum Counter {
+ FORWARDED
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ private ReduceSinkOperator correspondingReduceSinkOperators;
+
+ private transient final LongWritable forwarded_count;
+
+ private transient boolean firstRow;
+
+ private int[] allOperationPathTags;
+
+ private Object[] rowBuffer; // buffer the output from multiple parents
+
+ public CorrelationCompositeOperator() {
+ super();
+ forwarded_count = new LongWritable();
+ }
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ firstRow = true;
+ rowBuffer = new Object[parentOperators.size()];
+ correspondingReduceSinkOperators = conf.getCorrespondingReduceSinkOperator();
+ allOperationPathTags = conf.getAllOperationPathTags();
+ statsMap.put(Counter.FORWARDED, forwarded_count);
+ outputObjInspector =
+ ObjectInspectorUtils.getStandardObjectInspector(outputObjInspector,
+ ObjectInspectorCopyOption.JAVA);
+
+ // initialize its children
+ initializeChildren(hconf);
+ }
+
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
+ rowBuffer[tag] =
+ ObjectInspectorUtils.copyToStandardObject(row, inputObjInspectors[tag],
+ ObjectInspectorCopyOption.JAVA);
+ }
+
+ private void evaluateBuffer() {
+ ArrayList operationPathTags = new ArrayList();
+ boolean isForward = false;
+ Object forwardedRow = null;
+ for (int i = 0; i < rowBuffer.length; i++) {
+ if (rowBuffer[i] != null) {
+ isForward = true;
+ operationPathTags.add(allOperationPathTags[i]);
+ if (forwardedRow == null) {
+ forwardedRow = rowBuffer[i];
+ }
+ }
+ }
+ if (isForward) {
+ assert correspondingReduceSinkOperators != null;
+ correspondingReduceSinkOperators.setOperationPathTags(operationPathTags);
+ forwarded_count.set(forwarded_count.get() + 1);
+ try {
+ forward(forwardedRow, null);
+ } catch (HiveException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ for (int i = 0; i < rowBuffer.length; i++) {
+ rowBuffer[i] = null;
+ }
+ }
+
+ @Override
+ public void setRowNumber(long rowNumber) {
+ this.rowNumber = rowNumber;
+ if (childOperators == null) {
+ return;
+ }
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ assert rowNumber >= childOperatorsArray[i].getRowNumber();
+ if (rowNumber != childOperatorsArray[i].getRowNumber()) {
+ childOperatorsArray[i].setRowNumber(rowNumber);
+ }
+ }
+ if (firstRow) {
+ for (int i = 0; i < rowBuffer.length; i++) {
+ rowBuffer[i] = null;
+ }
+ firstRow = false;
+ } else {
+ evaluateBuffer();
+ }
+ }
+
+ @Override
+ public void closeOp(boolean abort) throws HiveException {
+ if (!abort) {
+ evaluateBuffer();
+ }
+ }
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return getOperatorName();
+ }
+
+ static public String getOperatorName() {
+ return "CCO";
+ }
+
+ @Override
+ public OperatorType getType() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java
new file mode 100644
index 0000000..7d56da8
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.CorrelationLocalSimulativeReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * CorrelationLocalSimulativeReduceSinkOperator simulates a ReduceSinkOperator and sends output to
+ * another operator (JOIN or GBY). CorrelationLocalSimulativeReduceSinkOperator is used only in
+ * reduce phase. Basically, it is a bridge from one JOIN or GBY operator to another JOIN or GBY
+ * operator. A CorrelationLocalSimulativeReduceSinkOperator will take care actions of startGroup and
+ * endGroup of its succeeding JOIN or GBY operator.
+ * Example: A query involves a JOIN operator and a GBY operator and the GBY operator consume the
+ * output of the JOIN operator. In this case, if join keys and group by keys are the same, we do not
+ * need to shuffle the data again, since data has been already partitioned by the JOIN operator.
+ * Thus, in CorrelationOptimizer, the ReduceSinkOperator between JOIN and GBY operator will be
+ * replaced by a CorrelationLocalSimulativeReduceSinkOperator and the JOIN operator and GBY operator
+ * will be executed in a single reduce phase.
+ **/
+public class CorrelationLocalSimulativeReduceSinkOperator
+extends BaseReduceSinkOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ transient TableDesc keyTableDesc;
+ transient TableDesc valueTableDesc;
+
+ transient Deserializer inputKeyDeserializer;
+
+ transient SerDe inputValueDeserializer;
+
+ transient ByteWritable tagWritable;
+
+ transient ObjectInspector outputKeyObjectInspector;
+ transient ObjectInspector outputValueObjectInspector;
+ transient ObjectInspector[] outputPartitionObjectInspectors;
+
+ private ArrayList forwardedRow;
+ private Object keyObject;
+ private Object valueObject;
+
+ private BytesWritable groupKey;
+
+ private static String[] fieldNames;
+
+ static {
+ ArrayList fieldNameArray = new ArrayList();
+ for (Utilities.ReduceField r : Utilities.ReduceField.values()) {
+ fieldNameArray.add(r.toString());
+ }
+ fieldNames = fieldNameArray.toArray(new String[0]);
+ }
+
+ public CorrelationLocalSimulativeReduceSinkOperator() {
+ }
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ forwardedRow = new ArrayList(3);
+ tagByte = new byte[1];
+ tagWritable = new ByteWritable();
+ tempInspectableObject = new InspectableObject();
+ keyWritable = new HiveKey();
+ assert childOperatorsArray.length == 1;
+ try {
+ keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()];
+ int i = 0;
+ for (ExprNodeDesc e : conf.getKeyCols()) {
+ keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+
+ numDistributionKeys = conf.getNumDistributionKeys();
+ distinctColIndices = conf.getDistinctColumnIndices();
+ numDistinctExprs = distinctColIndices.size();
+
+ valueEval = new ExprNodeEvaluator[conf.getValueCols().size()];
+ i = 0;
+ for (ExprNodeDesc e : conf.getValueCols()) {
+ valueEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+
+ tag = conf.getTag();
+ tagByte[0] = (byte) tag;
+ tagWritable.set(tagByte[0]);
+ LOG.info("Using tag = " + tag);
+
+ TableDesc keyTableDesc = conf.getKeySerializeInfo();
+ keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
+ .newInstance();
+ keySerializer.initialize(null, keyTableDesc.getProperties());
+ keyIsText = keySerializer.getSerializedClass().equals(Text.class);
+
+ inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc
+ .getDeserializerClass(), null);
+ inputKeyDeserializer.initialize(null, keyTableDesc.getProperties());
+ outputKeyObjectInspector = inputKeyDeserializer.getObjectInspector();
+
+ TableDesc valueTableDesc = conf.getValueSerializeInfo();
+ valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+ .newInstance();
+ valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+ inputValueDeserializer = (SerDe) ReflectionUtils.newInstance(
+ valueTableDesc.getDeserializerClass(), null);
+ inputValueDeserializer.initialize(null, valueTableDesc
+ .getProperties());
+ outputValueObjectInspector = inputValueDeserializer.getObjectInspector();
+
+ ObjectInspector rowInspector = inputObjInspectors[0];
+
+ keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval,
+ distinctColIndices,
+ conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector);
+ valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, conf
+ .getOutputValueColumnNames(), rowInspector);
+ int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1;
+ int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 :
+ numDistributionKeys;
+ cachedKeys = new Object[numKeys][keyLen];
+ cachedValues = new Object[valueEval.length];
+ assert cachedKeys.length == 1;
+
+ ArrayList ois = new ArrayList();
+ ois.add(outputKeyObjectInspector);
+ ois.add(outputValueObjectInspector);
+ ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector);
+
+ outputObjInspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois);
+
+ LOG.info("Simulative ReduceSink inputObjInspectors"
+ + ((StructObjectInspector) inputObjInspectors[0]).getTypeName());
+
+ LOG.info("Simulative ReduceSink outputObjInspectors "
+ + this.getChildOperators().get(0).getParentOperators().indexOf(this) +
+ " " + ((StructObjectInspector) outputObjInspector).getTypeName());
+
+ initializeChildren(hconf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
+ try {
+ // Evaluate the value
+ for (int i = 0; i < valueEval.length; i++) {
+ cachedValues[i] = valueEval[i].evaluate(row);
+ }
+ // Serialize the value
+ value = valueSerializer.serialize(cachedValues, valueObjectInspector);
+ valueObject = inputValueDeserializer.deserialize(value);
+
+ // Evaluate the keys
+ Object[] distributionKeys = new Object[numDistributionKeys];
+ for (int i = 0; i < numDistributionKeys; i++) {
+ distributionKeys[i] = keyEval[i].evaluate(row);
+ }
+
+ if (numDistinctExprs > 0) {
+ // with distinct key(s)
+ for (int i = 0; i < numDistinctExprs; i++) {
+ System.arraycopy(distributionKeys, 0, cachedKeys[i], 0, numDistributionKeys);
+ Object[] distinctParameters =
+ new Object[distinctColIndices.get(i).size()];
+ for (int j = 0; j < distinctParameters.length; j++) {
+ distinctParameters[j] =
+ keyEval[distinctColIndices.get(i).get(j)].evaluate(row);
+ }
+ cachedKeys[i][numDistributionKeys] =
+ new StandardUnion((byte) i, distinctParameters);
+ }
+ } else {
+ // no distinct key
+ System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys);
+ }
+
+ for (int i = 0; i < cachedKeys.length; i++) {
+ if (keyIsText) {
+ Text key = (Text) keySerializer.serialize(cachedKeys[i],
+ keyObjectInspector);
+ keyWritable.set(key.getBytes(), 0, key.getLength());
+ } else {
+ // Must be BytesWritable
+ BytesWritable key = (BytesWritable) keySerializer.serialize(
+ cachedKeys[i], keyObjectInspector);
+ keyWritable.set(key.getBytes(), 0, key.getLength());
+ }
+ if (!keyWritable.equals(groupKey)) {
+ try {
+ keyObject = inputKeyDeserializer.deserialize(keyWritable);
+ } catch (Exception e) {
+ throw new HiveException(
+ "Hive Runtime Error: Unable to deserialize reduce input key from "
+ + Utilities.formatBinaryString(keyWritable.get(), 0,
+ keyWritable.getSize()) + " with properties "
+ + keyTableDesc.getProperties(), e);
+ }
+ if (groupKey == null) { // the first group
+ groupKey = new BytesWritable();
+ } else {
+ // if its child has not been ended, end it
+ if (!keyWritable.equals(childOperatorsArray[0].getBytesWritableGroupKey())) {
+ childOperatorsArray[0].endGroup();
+ }
+ }
+ groupKey.set(keyWritable.get(), 0, keyWritable.getSize());
+ if (!groupKey.equals(childOperatorsArray[0].getBytesWritableGroupKey())) {
+ childOperatorsArray[0].startGroup();
+ childOperatorsArray[0].setGroupKeyObject(keyObject);
+ childOperatorsArray[0].setBytesWritableGroupKey(groupKey);
+ }
+ }
+ forwardedRow.clear();
+ forwardedRow.add(keyObject);
+ forwardedRow.add(valueObject);
+ forwardedRow.add(tagWritable);
+ forward(forwardedRow, outputObjInspector);
+ }
+ } catch (SerDeException e1) {
+ e1.printStackTrace();
+ }
+ }
+
+ @Override
+ public void closeOp(boolean abort) throws HiveException {
+ if (!abort) {
+ if (childOperatorsArray[0].allInitializedParentsAreClosed()) {
+ childOperatorsArray[0].endGroup();
+ }
+ }
+ }
+
+ @Override
+ public void startGroup() throws HiveException {
+ // do nothing
+ }
+
+ @Override
+ public void endGroup() throws HiveException {
+ // do nothing
+ }
+
+ @Override
+ public void setGroupKeyObject(Object keyObject) {
+ // do nothing
+ }
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return getOperatorName();
+ }
+
+ static public String getOperatorName() {
+ return "CLSReduceSink";
+ }
+
+ @Override
+ public OperatorType getType() {
+ return null;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java
new file mode 100644
index 0000000..b667494
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+
+/**
+ * CorrelationReducerDispatchOperator is an operator used by MapReduce join optimized by
+ * CorrelationOptimizer. If used, CorrelationReducerDispatchOperator is the first operator in reduce
+ * phase. In the case that multiple operation paths are merged into a single one, it will dispatch
+ * the record to corresponding JOIN or GBY operators. Every child of this operator is associated
+ * with a DispatcherHnadler, which evaluates the input row of this operator and then select
+ * corresponding fields for its associated child.
+ */
+public class CorrelationReducerDispatchOperator extends Operator
+implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private static String[] fieldNames;
+ static {
+ ArrayList fieldNameArray = new ArrayList();
+ for (Utilities.ReduceField r : Utilities.ReduceField.values()) {
+ fieldNameArray.add(r.toString());
+ }
+ fieldNames = fieldNameArray.toArray(new String[0]);
+ }
+
+ protected static class DispatchHandler {
+
+ protected Log l4j = LogFactory.getLog(this.getClass().getName());
+
+ private final ObjectInspector[] inputObjInspector;
+ private ObjectInspector outputObjInspector;
+ private ObjectInspector keyObjInspector;
+ private ObjectInspector valueObjInspector;
+ private final byte inputTag;
+ private final byte outputTag;
+ private final byte childIndx;
+ private final ByteWritable outputTagByteWritable;
+ private final SelectDesc selectDesc;
+ private final SelectDesc keySelectDesc;
+ private ExprNodeEvaluator[] keyEval;
+ private ExprNodeEvaluator[] eval;
+
+ // counters for debugging
+ private transient long cntr = 0;
+ private transient long nextCntr = 1;
+
+ private long getNextCntr(long cntr) {
+ // A very simple counter to keep track of number of rows processed by an
+ // operator. It dumps
+ // every 1 million times, and quickly before that
+ if (cntr >= 1000000) {
+ return cntr + 1000000;
+ }
+ return 10 * cntr;
+ }
+
+ public long getCntr() {
+ return this.cntr;
+ }
+
+ private final Log LOG;
+ private final boolean isLogInfoEnabled;
+ private final String id;
+
+ public DispatchHandler(ObjectInspector[] inputObjInspector, byte inputTag, byte childIndx,
+ byte outputTag,
+ SelectDesc selectDesc, SelectDesc keySelectDesc, Log LOG, String id)
+ throws HiveException {
+ this.inputObjInspector = inputObjInspector;
+ assert this.inputObjInspector.length == 1;
+ this.inputTag = inputTag;
+ this.childIndx = childIndx;
+ this.outputTag = outputTag;
+ this.selectDesc = selectDesc;
+ this.keySelectDesc = keySelectDesc;
+ this.outputTagByteWritable = new ByteWritable(outputTag);
+ this.LOG = LOG;
+ this.isLogInfoEnabled = LOG.isInfoEnabled();
+ this.id = id;
+ init();
+ }
+
+ private void init() throws HiveException {
+ ArrayList ois = new ArrayList();
+ if (keySelectDesc.isSelStarNoCompute()) {
+ ois.add((ObjectInspector) ((ArrayList) inputObjInspector[0]).get(0));
+ } else {
+ ArrayList colList = this.keySelectDesc.getColList();
+ keyEval = new ExprNodeEvaluator[colList.size()];
+ for (int k = 0; k < colList.size(); k++) {
+ assert (colList.get(k) != null);
+ keyEval[k] = ExprNodeEvaluatorFactory.get(colList.get(k));
+ }
+ keyObjInspector =
+ initEvaluatorsAndReturnStruct(keyEval, keySelectDesc
+ .getOutputColumnNames(), ((StandardStructObjectInspector) inputObjInspector[0])
+ .getAllStructFieldRefs().get(0).getFieldObjectInspector());
+
+ ois.add(keyObjInspector);
+ l4j.info("Key: input tag " + (int) inputTag + ", output tag " + (int) outputTag
+ + ", SELECT inputOIForThisTag"
+ + ((StructObjectInspector) inputObjInspector[0]).getTypeName());
+ }
+ if (selectDesc.isSelStarNoCompute()) {
+ ois.add((ObjectInspector) ((ArrayList) inputObjInspector[0]).get(1));
+ } else {
+ ArrayList colList = this.selectDesc.getColList();
+ eval = new ExprNodeEvaluator[colList.size()];
+ for (int k = 0; k < colList.size(); k++) {
+ assert (colList.get(k) != null);
+ eval[k] = ExprNodeEvaluatorFactory.get(colList.get(k));
+ }
+ valueObjInspector =
+ initEvaluatorsAndReturnStruct(eval, selectDesc
+ .getOutputColumnNames(), ((StandardStructObjectInspector) inputObjInspector[0])
+ .getAllStructFieldRefs().get(1).getFieldObjectInspector());
+
+ ois.add(valueObjInspector);
+ l4j.info("input tag " + (int) inputTag + ", output tag " + (int) outputTag
+ + ", SELECT inputOIForThisTag"
+ + ((StructObjectInspector) inputObjInspector[0]).getTypeName());
+ }
+ ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector);
+ outputObjInspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois);
+ l4j.info("input tag " + (int) inputTag + ", output tag " + (int) outputTag
+ + ", SELECT outputObjInspector"
+ + ((StructObjectInspector) outputObjInspector).getTypeName());
+ }
+
+ public ObjectInspector getOutputObjInspector() {
+ return outputObjInspector;
+ }
+
+ public Object process(Object row) throws HiveException {
+ ArrayList keyOutput = new ArrayList(keyEval.length);
+ Object[] valueOutput = new Object[eval.length];
+ ArrayList outputRow = new ArrayList(3);
+ List thisRow = (List) row;
+ if (keySelectDesc.isSelStarNoCompute()) {
+ outputRow.add(thisRow.get(0));
+ } else {
+ Object key = thisRow.get(0);
+ for (int j = 0; j < keyEval.length; j++) {
+ try {
+ keyOutput.add(keyEval[j].evaluate(key));
+ } catch (HiveException e) {
+ throw e;
+ } catch (RuntimeException e) {
+ throw new HiveException("Error evaluating "
+ + keySelectDesc.getColList().get(j).getExprString(), e);
+ }
+ }
+ outputRow.add(keyOutput);
+ }
+
+ if (selectDesc.isSelStarNoCompute()) {
+ outputRow.add(thisRow.get(1));
+ } else {
+ Object value = thisRow.get(1);
+ for (int j = 0; j < eval.length; j++) {
+ try {
+ valueOutput[j] = eval[j].evaluate(value);
+ } catch (HiveException e) {
+ throw e;
+ } catch (RuntimeException e) {
+ throw new HiveException("Error evaluating "
+ + selectDesc.getColList().get(j).getExprString(), e);
+ }
+ }
+ outputRow.add(valueOutput);
+ }
+ outputRow.add(outputTagByteWritable);
+
+ if (isLogInfoEnabled) {
+ cntr++;
+ if (cntr == nextCntr) {
+ LOG.info(id + "(inputTag, childIndx, outputTag)=(" + inputTag + ", " + childIndx + ", "
+ + outputTag + "), forwarding " + cntr + " rows");
+ nextCntr = getNextCntr(cntr);
+ }
+ }
+ return outputRow;
+ }
+
+ public void printCloseOpLog() {
+ LOG.info(id + "(inputTag, childIndx, outputTag)=(" + inputTag + ", " + childIndx + ", "
+ + outputTag + "), forwarded " + cntr + " rows");
+ }
+ }
+
+ // inputTag->(Child->List)
+ private HashMap>> dispatchConf;
+ // inputTag->(Child->List)
+ private HashMap>> dispatchValueSelectDescConf;
+ // inputTag->(Child->List)
+ private HashMap>> dispatchKeySelectDescConf;
+ // inputTag->(Child->List)
+ private HashMap>> dispatchHandlers;
+ // Child->(outputTag->DispatchHandler)
+ private HashMap> child2OutputTag2DispatchHandlers;
+ // Child->Child's inputObjInspectors
+ private HashMap childInputObjInspectors;
+
+ private int operationPathTag;
+ private int inputTag;
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ dispatchConf = conf.getDispatchConf();
+ dispatchValueSelectDescConf = conf.getDispatchValueSelectDescConf();
+ dispatchKeySelectDescConf = conf.getDispatchKeySelectDescConf();
+ dispatchHandlers = new HashMap>>();
+ for (Entry>> entry : dispatchConf.entrySet()) {
+ HashMap> tmp =
+ new HashMap>();
+ for (Entry> child2outputTag : entry.getValue().entrySet()) {
+ tmp.put(child2outputTag.getKey(), new ArrayList());
+ int indx = 0;
+ for (Integer outputTag : child2outputTag.getValue()) {
+ tmp.get(child2outputTag.getKey()).add(
+ new DispatchHandler(new ObjectInspector[] {inputObjInspectors[entry.getKey()]},
+ entry.getKey().byteValue(), child2outputTag.getKey().byteValue(),
+ outputTag.byteValue(), dispatchValueSelectDescConf.get(entry.getKey())
+ .get(child2outputTag.getKey()).get(indx),
+ dispatchKeySelectDescConf.get(entry.getKey()).get(child2outputTag.getKey())
+ .get(indx), LOG, id));
+ indx++;
+ }
+ }
+ dispatchHandlers.put(entry.getKey(), tmp);
+ }
+
+ child2OutputTag2DispatchHandlers = new HashMap>();
+ for (Entry>> entry : dispatchConf.entrySet()) {
+ for (Entry> child2outputTag : entry.getValue().entrySet()) {
+ if (!child2OutputTag2DispatchHandlers.containsKey(child2outputTag.getKey())) {
+ child2OutputTag2DispatchHandlers.put(child2outputTag.getKey(),
+ new HashMap());
+ }
+ int indx = 0;
+ for (Integer outputTag : child2outputTag.getValue()) {
+ child2OutputTag2DispatchHandlers.get(child2outputTag.getKey()).
+ put(outputTag,
+ dispatchHandlers.get(entry.getKey()).get(child2outputTag.getKey()).get(indx));
+ indx++;
+ }
+ }
+ }
+
+ childInputObjInspectors = new HashMap();
+ for (Entry> entry : child2OutputTag2DispatchHandlers
+ .entrySet()) {
+ Integer l = Collections.max(entry.getValue().keySet());
+ ObjectInspector[] childObjInspectors = new ObjectInspector[l.intValue() + 1];
+ for (Entry e : entry.getValue().entrySet()) {
+ if (e.getKey().intValue() == -1) {
+ assert childObjInspectors.length == 1;
+ childObjInspectors[0] = e.getValue().getOutputObjInspector();
+ } else {
+ childObjInspectors[e.getKey().intValue()] = e.getValue().getOutputObjInspector();
+ }
+ }
+ childInputObjInspectors.put(entry.getKey(), childObjInspectors);
+ }
+
+ initializeChildren(hconf);
+ }
+
+ // Each child should has its own outputObjInspector
+ @Override
+ protected void initializeChildren(Configuration hconf) throws HiveException {
+ state = State.INIT;
+ LOG.info("Operator " + id + " " + getName() + " initialized");
+ if (childOperators == null) {
+ return;
+ }
+ LOG.info("Initializing children of " + id + " " + getName());
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ LOG.info("Initializing child " + i + " " + childOperatorsArray[i].getIdentifier() + " " +
+ childOperatorsArray[i].getName() +
+ " " + childInputObjInspectors.get(i).length);
+ childOperatorsArray[i].initialize(hconf, childInputObjInspectors.get(i));
+ if (reporter != null) {
+ childOperatorsArray[i].setReporter(reporter);
+ }
+ }
+ }
+
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
+ ArrayList thisRow = (ArrayList) row;
+ assert thisRow.size() == 4;
+ operationPathTag = ((ByteWritable) thisRow.get(3)).get();
+ inputTag = ((ByteWritable) thisRow.get(2)).get();
+ forward(thisRow.subList(0, 3), inputObjInspectors[inputTag]);
+ }
+
+ @Override
+ public void forward(Object row, ObjectInspector rowInspector)
+ throws HiveException {
+ if ((++outputRows % 1000) == 0) {
+ if (counterNameToEnum != null) {
+ incrCounter(numOutputRowsCntr, outputRows);
+ outputRows = 0;
+ }
+ }
+
+ if (childOperatorsArray == null && childOperators != null) {
+ throw new HiveException("Internal Hive error during operator initialization.");
+ }
+
+ if ((childOperatorsArray == null) || (getDone())) {
+ return;
+ }
+
+ int childrenDone = 0;
+ int forwardFlag = 1;
+ assert childOperatorsArray.length <= 8;
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ Operator extends OperatorDesc> o = childOperatorsArray[i];
+ if (o.getDone()) {
+ childrenDone++;
+ } else {
+ if ((operationPathTag & (forwardFlag << i)) != 0) {
+ for (int j = 0; j < dispatchHandlers.get(inputTag).get(i).size(); j++) {
+ o.process(dispatchHandlers.get(inputTag).get(i).get(j).process(row),
+ dispatchConf.get(inputTag).get(i).get(j));
+ }
+ }
+ }
+ }
+
+ // if all children are done, this operator is also done
+ if (childrenDone == childOperatorsArray.length) {
+ setDone(true);
+ }
+ }
+
+ @Override
+ protected void closeOp(boolean abort) throws HiveException {
+ // log the number of rows forwarded from each dispatcherHandler
+ for (HashMap> childIndx2DispatchHandlers : dispatchHandlers
+ .values()) {
+ for (ArrayList dispatchHandlers : childIndx2DispatchHandlers.values()) {
+ for (DispatchHandler dispatchHandler : dispatchHandlers) {
+ dispatchHandler.printCloseOpLog();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void setGroupKeyObject(Object keyObject) {
+ this.groupKeyObject = keyObject;
+ for (Operator extends OperatorDesc> op : childOperators) {
+ op.setGroupKeyObject(keyObject);
+ }
+ }
+
+ @Override
+ public OperatorType getType() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return getOperatorName();
+ }
+
+ static public String getOperatorName() {
+ return "CDP";
+ }
+
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
index 283d0b6..0ade890 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
@@ -61,6 +61,7 @@ public class ExecReducer extends MapReduceBase implements Reducer {
private Reporter rp;
private boolean abort = false;
private boolean isTagged = false;
+ private boolean isOperationPathTagged = false; //If operation path is tagged
private long cntr = 0;
private long nextCntr = 1;
@@ -116,6 +117,7 @@ public class ExecReducer extends MapReduceBase implements Reducer {
reducer.setParentOperators(null); // clear out any parents as reducer is the
// root
isTagged = gWork.getNeedsTagging();
+ isOperationPathTagged = gWork.getNeedsOperationPathTagging();
try {
keyTableDesc = gWork.getKeyDesc();
inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc
@@ -164,8 +166,9 @@ public class ExecReducer extends MapReduceBase implements Reducer {
private BytesWritable groupKey;
- ArrayList row = new ArrayList(3);
+ ArrayList row = new ArrayList(4);
ByteWritable tag = new ByteWritable();
+ ByteWritable operationPathTags = new ByteWritable();
public void reduce(Object key, Iterator values, OutputCollector output,
Reporter reporter) throws IOException {
@@ -188,6 +191,14 @@ public class ExecReducer extends MapReduceBase implements Reducer {
keyWritable.setSize(size);
}
+ operationPathTags.set((byte)0);
+ if (isOperationPathTagged) {
+ // remove the operation plan tag
+ int size = keyWritable.getSize() - 1;
+ operationPathTags.set(keyWritable.get()[size]);
+ keyWritable.setSize(size);
+ }
+
if (!keyWritable.equals(groupKey)) {
// If a operator wants to do some work at the beginning of a group
if (groupKey == null) { // the first group
@@ -234,6 +245,7 @@ public class ExecReducer extends MapReduceBase implements Reducer {
row.add(valueObject[tag.get()]);
// The tag is not used any more, we should remove it.
row.add(tag);
+ row.add(operationPathTags);
if (isLogInfoEnabled) {
cntr++;
if (cntr == nextCntr) {
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
index e3ed13a..8eaa47c 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
@@ -48,7 +48,7 @@ public class JoinOperator extends CommonJoinOperator implements
/**
* SkewkeyTableCounter.
- *
+ *
*/
public static enum SkewkeyTableCounter {
SKEWJOINFOLLOWUPJOBS
@@ -141,7 +141,7 @@ public class JoinOperator extends CommonJoinOperator implements
/**
* All done.
- *
+ *
*/
@Override
public void closeOp(boolean abort) throws HiveException {
@@ -210,6 +210,7 @@ public class JoinOperator extends CommonJoinOperator implements
/**
* This is a similar implementation of FileSinkOperator.moveFileToFinalPath.
+ *
* @param specPath
* @param hconf
* @param success
@@ -218,7 +219,7 @@ public class JoinOperator extends CommonJoinOperator implements
* @throws IOException
* @throws HiveException
*/
- private void mvFileToFinalPath(String specPath, Configuration hconf,
+ private void mvFileToFinalPath(String specPath, Configuration hconf,
boolean success, Log log) throws IOException, HiveException {
FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
@@ -247,7 +248,7 @@ public class JoinOperator extends CommonJoinOperator implements
/**
* Forward a record of join results.
- *
+ *
* @throws HiveException
*/
@Override
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index f0c35e7..e010d3e 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.OutputCollector;
@@ -1359,4 +1360,52 @@ public abstract class Operator implements Serializable,C
return ret;
}
+
+ //bytesWritableGroupKey is only used when a query plan is optimized by CorrelationOptimizer.
+ // CorrelationLocalSimulativeReduceSinkOperator will use this variable to determine when it needs to start or end the group
+ // for its child operator.
+ protected BytesWritable bytesWritableGroupKey;
+
+ public void setBytesWritableGroupKey(BytesWritable groupKey) {
+ if (bytesWritableGroupKey == null) {
+ bytesWritableGroupKey = new BytesWritable();
+ }
+ bytesWritableGroupKey.set(groupKey.get(), 0, groupKey.getSize());
+ }
+
+ public BytesWritable getBytesWritableGroupKey() {
+ return bytesWritableGroupKey;
+ }
+
+ // The number of current row
+ protected long rowNumber;
+
+ public void initializeRowNumber() {
+ this.rowNumber = 0L;
+ LOG.info("Operator " + id + " " + getName() + " row number initialized to 0");
+ if (childOperators == null) {
+ return;
+ }
+ LOG.info("Initializing row numbers of children of " + id + " " + getName());
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ childOperatorsArray[i].initializeRowNumber();
+ }
+ }
+
+ public void setRowNumber(long rowNumber) {
+ this.rowNumber = rowNumber;
+ if (childOperators == null) {
+ return;
+ }
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ assert rowNumber >= childOperatorsArray[i].getRowNumber();
+ if (rowNumber != childOperatorsArray[i].getRowNumber()) {
+ childOperatorsArray[i].setRowNumber(rowNumber);
+ }
+ }
+ }
+
+ public long getRowNumber() {
+ return rowNumber;
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
index 0c22141..064afc9 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
@@ -22,6 +22,9 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.ql.plan.CollectDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationLocalSimulativeReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc;
import org.apache.hadoop.hive.ql.plan.ExtractDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc;
@@ -91,6 +94,12 @@ public final class OperatorFactory {
HashTableDummyOperator.class));
opvec.add(new OpTuple(HashTableSinkDesc.class,
HashTableSinkOperator.class));
+ opvec.add(new OpTuple(CorrelationCompositeDesc.class,
+ CorrelationCompositeOperator.class));
+ opvec.add(new OpTuple(CorrelationReducerDispatchDesc.class,
+ CorrelationReducerDispatchOperator.class));
+ opvec.add(new OpTuple(CorrelationLocalSimulativeReduceSinkDesc.class,
+ CorrelationLocalSimulativeReduceSinkOperator.class));
}
public static Operator get(Class opClass) {
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index a2caeed..555e595 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -21,173 +21,43 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
import java.util.Random;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.Serializer;
-import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
/**
* Reduce Sink Operator sends output to the reduce stage.
**/
-public class ReduceSinkOperator extends TerminalOperator
+public class ReduceSinkOperator extends BaseReduceSinkOperator
implements Serializable {
private static final long serialVersionUID = 1L;
- /**
- * The evaluators for the key columns. Key columns decide the sort order on
- * the reducer side. Key columns are passed to the reducer in the "key".
- */
- protected transient ExprNodeEvaluator[] keyEval;
- /**
- * The evaluators for the value columns. Value columns are passed to reducer
- * in the "value".
- */
- protected transient ExprNodeEvaluator[] valueEval;
- /**
- * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
- * Hive language). Partition columns decide the reducer that the current row
- * goes to. Partition columns are not passed to reducer.
- */
- protected transient ExprNodeEvaluator[] partitionEval;
-
- // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is
- // ready
- transient Serializer keySerializer;
- transient boolean keyIsText;
- transient Serializer valueSerializer;
- transient int tag;
- transient byte[] tagByte = new byte[1];
- transient protected int numDistributionKeys;
- transient protected int numDistinctExprs;
-
- @Override
- protected void initializeOp(Configuration hconf) throws HiveException {
-
- try {
- keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()];
- int i = 0;
- for (ExprNodeDesc e : conf.getKeyCols()) {
- keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
- }
-
- numDistributionKeys = conf.getNumDistributionKeys();
- distinctColIndices = conf.getDistinctColumnIndices();
- numDistinctExprs = distinctColIndices.size();
-
- valueEval = new ExprNodeEvaluator[conf.getValueCols().size()];
- i = 0;
- for (ExprNodeDesc e : conf.getValueCols()) {
- valueEval[i++] = ExprNodeEvaluatorFactory.get(e);
- }
-
- partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()];
- i = 0;
- for (ExprNodeDesc e : conf.getPartitionCols()) {
- partitionEval[i++] = ExprNodeEvaluatorFactory.get(e);
- }
-
- tag = conf.getTag();
- tagByte[0] = (byte) tag;
- LOG.info("Using tag = " + tag);
-
- TableDesc keyTableDesc = conf.getKeySerializeInfo();
- keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
- .newInstance();
- keySerializer.initialize(null, keyTableDesc.getProperties());
- keyIsText = keySerializer.getSerializedClass().equals(Text.class);
+ private final ArrayList operationPathTags = new ArrayList(); // operation path tags
+ private final byte[] operationPathTagsByte = new byte[1];
- TableDesc valueTableDesc = conf.getValueSerializeInfo();
- valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
- .newInstance();
- valueSerializer.initialize(null, valueTableDesc.getProperties());
-
- firstRow = true;
- initializeChildren(hconf);
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
+ public void setOperationPathTags(ArrayList operationPathTags) {
+ this.operationPathTags.addAll(operationPathTags);
+ int operationPathTagsInt = 0;
+ int tmp = 1;
+ for (Integer operationPathTag: operationPathTags) {
+ operationPathTagsInt += tmp << operationPathTag.intValue();
}
+ operationPathTagsByte[0] = (byte) operationPathTagsInt;
}
- transient InspectableObject tempInspectableObject = new InspectableObject();
- transient HiveKey keyWritable = new HiveKey();
- transient Writable value;
-
- transient StructObjectInspector keyObjectInspector;
- transient StructObjectInspector valueObjectInspector;
- transient ObjectInspector[] partitionObjectInspectors;
-
- transient Object[][] cachedKeys;
- transient Object[] cachedValues;
- transient List> distinctColIndices;
-
- boolean firstRow;
-
- transient Random random;
-
- /**
- * Initializes array of ExprNodeEvaluator. Adds Union field for distinct
- * column indices for group by.
- * Puts the return values into a StructObjectInspector with output column
- * names.
- *
- * If distinctColIndices is empty, the object inspector is same as
- * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)}
- */
- protected static StructObjectInspector initEvaluatorsAndReturnStruct(
- ExprNodeEvaluator[] evals, List> distinctColIndices,
- List outputColNames,
- int length, ObjectInspector rowInspector)
- throws HiveException {
- int inspectorLen = evals.length > length ? length + 1 : evals.length;
- List sois = new ArrayList(inspectorLen);
-
- // keys
- ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, 0, length, rowInspector);
- sois.addAll(Arrays.asList(fieldObjectInspectors));
-
- if (evals.length > length) {
- // union keys
- List uois = new ArrayList();
- for (List distinctCols : distinctColIndices) {
- List names = new ArrayList();
- List eois = new ArrayList();
- int numExprs = 0;
- for (int i : distinctCols) {
- names.add(HiveConf.getColumnInternalName(numExprs));
- eois.add(evals[i].initialize(rowInspector));
- numExprs++;
- }
- uois.add(ObjectInspectorFactory.getStandardStructObjectInspector(names, eois));
- }
- UnionObjectInspector uoi =
- ObjectInspectorFactory.getStandardUnionObjectInspector(uois);
- sois.add(uoi);
- }
- return ObjectInspectorFactory.getStandardStructObjectInspector(outputColNames, sois );
+ public ArrayList getOperationPathTags() {
+ return this.operationPathTags;
}
-
+
@Override
public void processOp(Object row, int tag) throws HiveException {
try {
@@ -267,9 +137,18 @@ public class ReduceSinkOperator extends TerminalOperator
keyWritable.set(key.getBytes(), 0, key.getLength());
} else {
int keyLength = key.getLength();
- keyWritable.setSize(keyLength + 1);
+ if (!this.getConf().getNeedsOperationPathTagging()) {
+ keyWritable.setSize(keyLength + 1);
+ } else {
+ keyWritable.setSize(keyLength + 2);
+ }
System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
- keyWritable.get()[keyLength] = tagByte[0];
+ if (!this.getConf().getNeedsOperationPathTagging()) {
+ keyWritable.get()[keyLength] = tagByte[0];
+ } else {
+ keyWritable.get()[keyLength] = operationPathTagsByte[0];
+ keyWritable.get()[keyLength + 1] = tagByte[0];
+ }
}
} else {
// Must be BytesWritable
@@ -279,9 +158,18 @@ public class ReduceSinkOperator extends TerminalOperator
keyWritable.set(key.getBytes(), 0, key.getLength());
} else {
int keyLength = key.getLength();
- keyWritable.setSize(keyLength + 1);
+ if (!this.getConf().getNeedsOperationPathTagging()) {
+ keyWritable.setSize(keyLength + 1);
+ } else {
+ keyWritable.setSize(keyLength + 2);
+ }
System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
- keyWritable.get()[keyLength] = tagByte[0];
+ if (!this.getConf().getNeedsOperationPathTagging()) {
+ keyWritable.get()[keyLength] = tagByte[0];
+ } else {
+ keyWritable.get()[keyLength] = operationPathTagsByte[0];
+ keyWritable.get()[keyLength + 1] = tagByte[0];
+ }
}
}
keyWritable.setHashCode(keyHashCode);
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
index 1a40630..131f640 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
@@ -583,7 +583,7 @@ public class SMBMapJoinOperator extends AbstractMapJoinOperator imp
}
@Override
- protected boolean allInitializedParentsAreClosed() {
+ public boolean allInitializedParentsAreClosed() {
return true;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
index dffdd7b..1881f9e 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
@@ -80,6 +80,9 @@ public class TableScanOperator extends Operator implements
if (conf != null && conf.isGatherStats()) {
gatherStats(row);
}
+ if (conf != null && conf.isForwardRowNumber()) {
+ setRowNumber(rowNumber+1);
+ }
forward(row, inputObjInspectors[tag]);
}
@@ -169,6 +172,12 @@ public class TableScanOperator extends Operator implements
if (conf == null) {
return;
}
+
+ LOG.info(this.getName() + " forward row number " + conf.isForwardRowNumber());
+ if(conf.isForwardRowNumber()){
+ initializeRowNumber();
+ }
+
if (!conf.isGatherStats()) {
return;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java
new file mode 100644
index 0000000..eae531c
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java
@@ -0,0 +1,964 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.QB;
+import org.apache.hadoop.hive.ql.parse.QBExpr;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+
+/**
+ * Implementation of correlation optimizer. The optimization is based on
+ * the paper
+ * "YSmart: Yet Another SQL-to-MapReduce Translator"
+ * (Rubao Lee, Tian Luo, Yin Huai, Fusheng Wang, Yongqiang He, and Xiaodong Zhang).
+ * This optimizer first detects three kinds of
+ * correlations, Input Correlation (IC), Transit Correlation (TC) and Job Flow Correlation (JFC),
+ * and then merge correlated MapReduce-jobs (MR-jobs) into one MR-job.
+ * Correlation correlation detection and query plan tree transformation is the last transformation
+ * in the {@link Optimizer}. Since opColumnExprMap, opParseCtx, opRowResolver may be changed by
+ * other optimizers,
+ * currently, correlation optimizer has two phases. The first phase is the first transformation in
+ * the {@link Optimizer}. In the first phase, original opColumnExprMap, opParseCtx, opRowResolver
+ * will be recorded. Then, the second phase (the last transformation) will perform correlation
+ * detection and
+ * query plan tree transformation.
+ *
+ * Correlations For the definitions of correlations, see the
+ * original paper .
+ *
+ * Rules Rules for merging correlated MR-jobs implemented in this correlation
+ * optimizer are: If an MR-job for a Join operation has the same partitioning keys with its all
+ * preceding MR-jobs, correlation optimizer merges these MR-jobs into one MR-job. If an
+ * MR-job for a GroupBy and Aggregation operation has the same partitioning keys with its preceding
+ * MR-job, correlation optimizer merges these two MR-jobs into one MR-job. Note: In the
+ * current implementation, if correlation optimizer detects MR-jobs of a sub-plan tree are
+ * correlated, it transforms this sub-plan tree to a single MR-job when the input of this sub-plan tree is not a temporary
+ * table. Otherwise, the current implementation will ignore this sub-plan tree.
+ *
+ * Future Work There are several future work that will enhance the correlation
+ * optimizer. Here are three examples: Add a new rule that is if two MR-jobs share the same
+ * partitioning keys and they have common input tables, merge these two MR-jobs into a single
+ * MR-job. The current implementation detects MR-jobs which have the same partitioning keys
+ * as correlated MR-jobs. However, the condition of same partitioning keys can be relaxed to use
+ * common partitioning keys. The current implementation cannot optimize MR-jobs for the
+ * aggregation functions with a distinct keyword, which should be supported in the future
+ * implementation of the correlation optimizer. Optimize queries involve self-join.
+ */
+
+public class CorrelationOptimizer implements Transform {
+
+ static final private Log LOG = LogFactory.getLog(CorrelationOptimizer.class.getName());
+ private final HashMap AliastoTabName;
+ private final HashMap AliastoTab;
+
+ public CorrelationOptimizer() {
+ super();
+ AliastoTabName = new HashMap();
+ AliastoTab = new HashMap();
+ pGraphContext = null;
+ }
+
+ private boolean initializeAliastoTabNameMapping(QB qb) {
+ // If any sub-query's qb is null, CorrelationOptimizer will not optimize this query.
+ // e.g. auto_join27.q
+ if (qb == null) {
+ return false;
+ }
+ boolean ret = true;
+ for (String alias : qb.getAliases()) {
+ AliastoTabName.put(alias, qb.getTabNameForAlias(alias));
+ AliastoTab.put(alias, qb.getMetaData().getSrcForAlias(alias));
+ }
+ for (String subqalias : qb.getSubqAliases()) {
+ QBExpr qbexpr = qb.getSubqForAlias(subqalias);
+ ret = ret && initializeAliastoTabNameMapping(qbexpr.getQB());
+ }
+ return ret;
+ }
+
+ protected ParseContext pGraphContext;
+ private LinkedHashMap, OpParseContext> opParseCtx;
+ private final LinkedHashMap, OpParseContext> originalOpParseCtx =
+ new LinkedHashMap, OpParseContext>();
+ private final LinkedHashMap, RowResolver> originalOpRowResolver =
+ new LinkedHashMap, RowResolver>();
+ private final LinkedHashMap, Map> originalOpColumnExprMap =
+ new LinkedHashMap, Map>();
+
+ private boolean isPhase1 = true;
+ private boolean hasMultipleFileSinkOperators = false;
+
+ private Map groupbyNonMapSide2MapSide;
+ private Map groupbyMapSide2NonMapSide;
+
+ /**
+ * Transform the query tree. Firstly, find out correlations between operations.
+ * Then, group these operators in groups
+ *
+ * @param pactx
+ * current parse context
+ * @throws SemanticException
+ */
+ public ParseContext transform(ParseContext pctx) throws SemanticException {
+ if (isPhase1) {
+ pGraphContext = pctx;
+ opParseCtx = pctx.getOpParseCtx();
+
+ CorrelationNodePhase1ProcCtx phase1ProcCtx = new CorrelationNodePhase1ProcCtx();
+ Map opRules = new LinkedHashMap();
+ Dispatcher disp = new DefaultRuleDispatcher(getPhase1DefaultProc(), opRules,
+ phase1ProcCtx);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+
+ // Create a list of topOp nodes
+ ArrayList topNodes = new ArrayList();
+ topNodes.addAll(pGraphContext.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+ isPhase1 = false;
+ hasMultipleFileSinkOperators = phase1ProcCtx.fileSinkOperatorCount > 1;
+ } else {
+ /*
+ * Types of correlations:
+ * 1) Input Correlation: Multiple nodes have input correlation
+ * (IC) if their input relation sets are not disjoint;
+ * 2) Transit Correlation: Multiple nodes have transit correlation
+ * (TC) if they have not only input correlation, but
+ * also the same partition key;
+ * 3) Job Flow Correlation: A node has job flow correlation
+ * (JFC) with one of its child nodes if it has the same
+ * partition key as that child node.
+ */
+
+ pGraphContext = pctx;
+ if (hasMultipleFileSinkOperators) {
+ //TODO: handle queries with multiple FileSinkOperators;
+ return pGraphContext;
+ }
+
+
+ opParseCtx = pctx.getOpParseCtx();
+
+ groupbyNonMapSide2MapSide = pctx.getGroupbyNonMapSide2MapSide();
+ groupbyMapSide2NonMapSide = pctx.getGroupbyMapSide2NonMapSide();
+
+ QB qb = pGraphContext.getQB();
+ boolean cannotHandle = !initializeAliastoTabNameMapping(qb);
+ if (cannotHandle) {
+ LOG.info("This query or its sub-queries has a null qb. " +
+ "Will not try to optimize it.");
+ return pGraphContext;
+ }
+
+ // 0: Replace all map-side group by pattern (GBY-RS-GBY) to
+ // non-map-side group by pattern (RS-GBY) if necessary
+ if (pGraphContext.getConf().getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) {
+ for (Entry entry:
+ groupbyMapSide2NonMapSide.entrySet()) {
+ GroupByOperator mapSidePatternStart = entry.getKey();
+ GroupByOperator mapSidePatternEnd = (GroupByOperator) mapSidePatternStart
+ .getChildOperators().get(0).getChildOperators().get(0);
+ ReduceSinkOperator nonMapSidePatternStart = entry.getValue();
+ GroupByOperator nonMapSidePatternEnd = (GroupByOperator) nonMapSidePatternStart
+ .getChildOperators().get(0);
+
+ List> parents = mapSidePatternStart.getParentOperators();
+ List> children = mapSidePatternEnd.getChildOperators();
+
+ nonMapSidePatternStart.setParentOperators(parents);
+ nonMapSidePatternEnd.setChildOperators(children);
+
+ for (Operator extends OperatorDesc> parent: parents) {
+ parent.replaceChild(mapSidePatternStart, nonMapSidePatternStart);
+ }
+ for (Operator extends OperatorDesc> child: children) {
+ child.replaceParent(mapSidePatternEnd, nonMapSidePatternEnd);
+ }
+ addOperatorInfo(nonMapSidePatternStart);
+ addOperatorInfo(nonMapSidePatternEnd);
+ }
+ }
+
+ // 1: detect correlations
+ CorrelationNodeProcCtx correlationCtx = new CorrelationNodeProcCtx();
+
+ Map opRules = new LinkedHashMap();
+ opRules.put(new RuleRegExp("R1", ReduceSinkOperator.getOperatorName() + "%"),
+ new CorrelationNodeProc());
+
+ Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, correlationCtx);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+
+ // Create a list of topOp nodes
+ ArrayList topNodes = new ArrayList();
+ topNodes.addAll(pGraphContext.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+
+ // 2: transform the query plan tree
+ LOG.info("Begain query plan transformation based on intra-query correlations. " +
+ correlationCtx.getCorrelations().size() + " correlation(s) to be applied");
+ int correlationsAppliedCount = 0;
+ for (IntraQueryCorrelation correlation : correlationCtx.getCorrelations()) {
+ boolean ret = CorrelationOptimizerUtils.applyCorrelation(
+ correlation, pGraphContext, originalOpColumnExprMap, originalOpRowResolver,
+ groupbyNonMapSide2MapSide, originalOpParseCtx);
+ if (ret) {
+ correlationsAppliedCount++;
+ }
+ }
+
+ // 3: if no correlation applied, replace all non-map-side group by pattern (GBY-RS-GBY) to
+ // map-side group by pattern (RS-GBY) if necessary
+ if (correlationsAppliedCount == 0 &&
+ pGraphContext.getConf().getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) {
+ for (Entry entry:
+ groupbyNonMapSide2MapSide.entrySet()) {
+ GroupByOperator mapSidePatternStart = entry.getValue();
+ GroupByOperator mapSidePatternEnd = (GroupByOperator) mapSidePatternStart
+ .getChildOperators().get(0).getChildOperators().get(0);
+ ReduceSinkOperator nonMapSidePatternStart = entry.getKey();
+ GroupByOperator nonMapSidePatternEnd = (GroupByOperator) nonMapSidePatternStart
+ .getChildOperators().get(0);
+
+ List> parents = nonMapSidePatternStart.getParentOperators();
+ List> children = nonMapSidePatternEnd.getChildOperators();
+
+ mapSidePatternStart.setParentOperators(parents);
+ mapSidePatternEnd.setChildOperators(children);
+
+ for (Operator extends OperatorDesc> parent: parents) {
+ parent.replaceChild(nonMapSidePatternStart, mapSidePatternStart);
+ }
+ for (Operator extends OperatorDesc> child: children) {
+ child.replaceParent(nonMapSidePatternEnd, mapSidePatternEnd);
+ }
+ }
+ }
+ LOG.info("Finish query plan transformation based on intra-query correlations. " +
+ correlationsAppliedCount + " correlation(s) actually be applied");
+ }
+ return pGraphContext;
+ }
+
+ private void addOperatorInfo(Operator extends OperatorDesc> op) {
+ OpParseContext opCtx = opParseCtx.get(op);
+ if (op.getColumnExprMap() != null) {
+ if (!originalOpColumnExprMap.containsKey(op)) {
+ originalOpColumnExprMap.put(op, op.getColumnExprMap());
+ }
+ }
+ if (opCtx != null) {
+ if (!originalOpParseCtx.containsKey(op)) {
+ originalOpParseCtx.put(op, opCtx);
+ }
+ if (opCtx.getRowResolver() != null) {
+ if (!originalOpRowResolver.containsKey(op)) {
+ originalOpRowResolver.put(op, opCtx.getRowResolver());
+ }
+ }
+ }
+ }
+
+ private NodeProcessor getPhase1DefaultProc() {
+ return new NodeProcessor() {
+ @Override
+ public Object process(Node nd, Stack stack,
+ NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
+ Operator extends OperatorDesc> op = (Operator extends OperatorDesc>) nd;
+ addOperatorInfo(op);
+
+ if (op.getName().equals(FileSinkOperator.getOperatorName())) {
+ ((CorrelationNodePhase1ProcCtx)procCtx).fileSinkOperatorCount++;
+ }
+ return null;
+ }
+ };
+ }
+
+ private class CorrelationNodeProc implements NodeProcessor {
+
+ public ReduceSinkOperator findNextChildReduceSinkOperator(ReduceSinkOperator rsop) {
+ Operator extends OperatorDesc> op = rsop.getChildOperators().get(0);
+ while (!op.getName().equals(ReduceSinkOperator.getOperatorName())) {
+ if (op.getName().equals(FileSinkOperator.getOperatorName())) {
+ return null;
+ }
+ assert op.getChildOperators().size() <= 1;
+ op = op.getChildOperators().get(0);
+ }
+ return (ReduceSinkOperator) op;
+ }
+
+ private HashSet findCorrelatedReduceSinkOperators(
+ Operator extends OperatorDesc> op, HashSet keyColumns,
+ IntraQueryCorrelation correlation) throws Exception {
+
+ LOG.info("now detecting operator " + op.getIdentifier() + " " + op.getName());
+
+ HashSet correlatedReduceSinkOps = new HashSet();
+ if (op.getParentOperators() == null) {
+ return correlatedReduceSinkOps;
+ }
+ if (originalOpColumnExprMap.get(op) == null && !(op instanceof ReduceSinkOperator)) {
+ for (Operator extends OperatorDesc> parent : op.getParentOperators()) {
+ correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators(
+ parent, keyColumns, correlation));
+ }
+ } else if (originalOpColumnExprMap.get(op) != null && !(op instanceof ReduceSinkOperator)) {
+ HashSet newKeyColumns = new HashSet();
+ for (String keyColumn : keyColumns) {
+ ExprNodeDesc col = originalOpColumnExprMap.get(op).get(keyColumn);
+ if (col instanceof ExprNodeColumnDesc) {
+ newKeyColumns.add(((ExprNodeColumnDesc) col).getColumn());
+ }
+ }
+
+ if (op.getName().equals(CommonJoinOperator.getOperatorName())) {
+ HashSet tableNeedToCheck = new HashSet();
+ for (String keyColumn : keyColumns) {
+ for (ColumnInfo cinfo : originalOpParseCtx.get(op).getRowResolver().getColumnInfos()) {
+ if (keyColumn.equals(cinfo.getInternalName())) {
+ tableNeedToCheck.add(cinfo.getTabAlias());
+ }
+ }
+ }
+ for (Operator extends OperatorDesc> parent : op.getParentOperators()) {
+ Set tableNames =
+ originalOpParseCtx.get(parent).getRowResolver().getTableNames();
+ for (String tbl : tableNames) {
+ if (tableNeedToCheck.contains(tbl)) {
+ correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators(parent,
+ newKeyColumns, correlation));
+ }
+ }
+ }
+ } else {
+ for (Operator extends OperatorDesc> parent : op.getParentOperators()) {
+ correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators(
+ parent, newKeyColumns, correlation));
+ }
+ }
+ } else if (originalOpColumnExprMap.get(op) != null && op instanceof ReduceSinkOperator) {
+ HashSet newKeyColumns = new HashSet();
+ for (String keyColumn : keyColumns) {
+ ExprNodeDesc col = originalOpColumnExprMap.get(op).get(keyColumn);
+ if (col instanceof ExprNodeColumnDesc) {
+ newKeyColumns.add(((ExprNodeColumnDesc) col).getColumn());
+ }
+ }
+
+ ReduceSinkOperator rsop = (ReduceSinkOperator) op;
+ HashSet thisKeyColumns = new HashSet();
+ for (ExprNodeDesc key : rsop.getConf().getKeyCols()) {
+ if (key instanceof ExprNodeColumnDesc) {
+ thisKeyColumns.add(((ExprNodeColumnDesc) key).getColumn());
+ }
+ }
+
+ boolean isCorrelated = false;
+ Set intersection = new HashSet(newKeyColumns);
+ intersection.retainAll(thisKeyColumns);
+ // TODO: should use if intersection is empty to evaluate if two corresponding operators are
+ // correlated
+ // isCorrelated = !(intersection.isEmpty());
+ isCorrelated = (intersection.size() == thisKeyColumns.size() && !intersection.isEmpty());
+
+ ReduceSinkOperator nextChildReduceSinkOperator = findNextChildReduceSinkOperator(rsop);
+ // Since we start the search from those reduceSinkOperator at bottom (near FileSinkOperator),
+ // we can always find a reduceSinkOperator at a lower level
+ assert nextChildReduceSinkOperator != null;
+ if (isCorrelated) {
+ if (nextChildReduceSinkOperator.getChildOperators().get(0).getName().equals(
+ CommonJoinOperator.getOperatorName())) {
+ if (intersection.size() != nextChildReduceSinkOperator.getConf().getKeyCols().size() ||
+ intersection.size() != rsop.getConf().getKeyCols().size()) {
+ // Right now, we can only handle identical join keys.
+ isCorrelated = false;
+ }
+ }
+ }
+
+ if (isCorrelated) {
+ LOG.info("Operator " + op.getIdentifier() + " " + op.getName() + " is correlated");
+ LOG.info("--keys of this operator: " + thisKeyColumns.toString());
+ LOG.info("--keys of child operator: " + keyColumns.toString());
+ LOG.info("--keys of child operator mapped to this operator:" + newKeyColumns.toString());
+ if (((Operator extends OperatorDesc>) (op.getChildOperators().get(0))).getName()
+ .equals(CommonJoinOperator.getOperatorName())) {
+ ArrayList peers =
+ CorrelationOptimizerUtils.findPeerReduceSinkOperators(rsop);
+ correlatedReduceSinkOps.addAll(peers);
+ } else {
+ correlatedReduceSinkOps.add(rsop);
+ }
+ // this if block is useful when we use "isCorrelated = !(intersection.isEmpty());" for
+ // the evaluation of isCorrelated
+ if (nextChildReduceSinkOperator.getChildOperators().get(0).getName().equals(
+ GroupByOperator.getOperatorName()) &&
+ (intersection.size() < nextChildReduceSinkOperator.getConf().getKeyCols().size())) {
+ LOG.info("--found a RS-GBY pattern that needs to be replaced to GBY-RS-GBY patterns. "
+ + " The number of common keys is "
+ + intersection.size()
+ + ", and the number of keys of next group by operator"
+ + nextChildReduceSinkOperator.getConf().getKeyCols().size());
+ correlation.addToRSGBYToBeReplacedByGBYRSGBY(nextChildReduceSinkOperator);
+ }
+ } else {
+ LOG.info("Operator " + op.getIdentifier() + " " + op.getName() + " is not correlated");
+ LOG.info("--keys of this operator: " + thisKeyColumns.toString());
+ LOG.info("--keys of child operator: " + keyColumns.toString());
+ LOG.info("--keys of child operator mapped to this operator:" + newKeyColumns.toString());
+ correlatedReduceSinkOps.clear();
+ correlation.getRSGBYToBeReplacedByGBYRSGBY().clear();
+ }
+ } else {
+ throw new Exception("Correlation optimizer: ReduceSinkOperator " + op.getIdentifier()
+ + " does not have ColumnExprMap");
+ }
+ return correlatedReduceSinkOps;
+ }
+
+ private HashSet exploitJFC(ReduceSinkOperator op,
+ CorrelationNodeProcCtx correlationCtx, IntraQueryCorrelation correlation) {
+
+ correlationCtx.addWalked(op);
+ correlation.addToAllReduceSinkOperators(op);
+
+ HashSet reduceSinkOperators = new HashSet();
+
+ boolean shouldDetect = true;
+
+ ArrayList keys = op.getConf().getKeyCols();
+ HashSet keyColumns = new HashSet();
+ for (ExprNodeDesc key : keys) {
+ if (!(key instanceof ExprNodeColumnDesc)) {
+ shouldDetect = false;
+ } else {
+ keyColumns.add(((ExprNodeColumnDesc) key).getColumn());
+ }
+ }
+
+ if (shouldDetect) {
+ HashSet newReduceSinkOperators = new HashSet();
+ for (Operator extends OperatorDesc> parent : op.getParentOperators()) {
+ try {
+ LOG.info("Operator " + op.getIdentifier()
+ + ": start detecting correlation from this operator");
+ LOG.info("--keys of this operator: " + keyColumns.toString());
+ HashSet correlatedReduceSinkOperators =
+ findCorrelatedReduceSinkOperators(parent, keyColumns, correlation);
+ if (correlatedReduceSinkOperators.size() == 0) {
+ newReduceSinkOperators.add(op);
+ } else {
+ for (ReduceSinkOperator rsop : correlatedReduceSinkOperators) {
+
+ // For two ReduceSinkOperators, we say the one closer to FileSinkOperators is up and
+ // another one is down
+
+ if (!correlation.getUp2downRSops().containsKey(op)) {
+ correlation.getUp2downRSops().put(op, new ArrayList());
+ }
+ correlation.getUp2downRSops().get(op).add(rsop);
+
+ if (!correlation.getDown2upRSops().containsKey(rsop)) {
+ correlation.getDown2upRSops().put(rsop, new ArrayList());
+ }
+ correlation.getDown2upRSops().get(rsop).add(op);
+ HashSet exploited = exploitJFC(rsop, correlationCtx,
+ correlation);
+ if (exploited.size() == 0) {
+ newReduceSinkOperators.add(rsop);
+ } else {
+ newReduceSinkOperators.addAll(exploited);
+ }
+ }
+ }
+
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ reduceSinkOperators.addAll(newReduceSinkOperators);
+ }
+ return reduceSinkOperators;
+ }
+
+ private TableScanOperator findTableScanOPerator(Operator extends OperatorDesc> startPoint) {
+ Operator extends OperatorDesc> thisOp = startPoint.getParentOperators().get(0);
+ while (true) {
+ if (thisOp.getName().equals(ReduceSinkOperator.getOperatorName())) {
+ return null;
+ } else if (thisOp.getName().equals(TableScanOperator.getOperatorName())) {
+ return (TableScanOperator) thisOp;
+ } else {
+ if (thisOp.getParentOperators() != null) {
+ thisOp = thisOp.getParentOperators().get(0);
+ } else {
+ break;
+ }
+ }
+ }
+ return null;
+ }
+
+ private void annotateOpPlan(IntraQueryCorrelation correlation) {
+ HashMap bottomReduceSink2OperationPath =
+ new HashMap();
+ int indx = 0;
+ for (ReduceSinkOperator rsop : correlation.getBottomReduceSinkOperators()) {
+ if (!bottomReduceSink2OperationPath.containsKey(rsop)) {
+ bottomReduceSink2OperationPath.put(rsop, indx);
+ for (ReduceSinkOperator peerRSop : CorrelationOptimizerUtils
+ .findPeerReduceSinkOperators(rsop)) {
+ if (correlation.getBottomReduceSinkOperators().contains(peerRSop)) {
+ bottomReduceSink2OperationPath.put(peerRSop, indx);
+ }
+ }
+ indx++;
+ }
+ }
+ correlation.setBottomReduceSink2OperationPathMap(bottomReduceSink2OperationPath);
+ }
+
+ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx,
+ Object... nodeOutputs) throws SemanticException {
+ LOG.info("Walk to operator " + ((Operator) nd).getIdentifier() + " "
+ + ((Operator) nd).getName());
+ addOperatorInfo((Operator extends OperatorDesc>) nd);
+
+ CorrelationNodeProcCtx correlationCtx = (CorrelationNodeProcCtx) ctx;
+
+ ReduceSinkOperator op = (ReduceSinkOperator) nd;
+
+ if (correlationCtx.isWalked(op)) {
+ return null;
+ }
+
+ if (op.getConf().getKeyCols().size() == 0 ||
+ (!op.getChildOperators().get(0).getName().equals(CommonJoinOperator.getOperatorName()) &&
+ !op.getChildOperators().get(0).getName().equals(GroupByOperator.getOperatorName()))) {
+ correlationCtx.addWalked(op);
+ return null;
+ }
+
+ // 1: find out correlation
+ IntraQueryCorrelation correlation = new IntraQueryCorrelation();
+ ArrayList peerReduceSinkOperators =
+ CorrelationOptimizerUtils.findPeerReduceSinkOperators(op);
+ ArrayList bottomReduceSinkOperators = new ArrayList();
+ for (ReduceSinkOperator rsop : peerReduceSinkOperators) {
+ HashSet thisBottomReduceSinkOperators = exploitJFC(rsop,
+ correlationCtx, correlation);
+
+ // TODO: if we use "isCorrelated = !(intersection.isEmpty());" in the method
+ // findCorrelatedReduceSinkOperators
+ // for the evaluation of isCorrelated, uncomment the following if block. The
+ // reduceSinkOperator at the top level
+ // should take special care, since we cannot evaluate if the relationship of the set of keys
+ // of this operator with
+ // that of its next parent reduceSinkOperator.
+ // if (peerReduceSinkOperators.size() == 1) {
+ // correlation.addToRSGBYToBeReplacedByGBYRSGBY(rsop);
+ // }
+ if (thisBottomReduceSinkOperators.size() == 0) {
+ thisBottomReduceSinkOperators.add(rsop);
+ } else {
+ boolean isClear = false;
+ // bottom ReduceSinkOperators are those ReduceSinkOperators which are close to
+ // TableScanOperators
+ for (ReduceSinkOperator bottomRsop : thisBottomReduceSinkOperators) {
+ TableScanOperator tsop = findTableScanOPerator(bottomRsop);
+ if (tsop == null) {
+ isClear = true; // currently the optimizer can only optimize correlations involving
+ // source tables (input tables)
+ } else {
+ // bottom ReduceSinkOperators are those ReduceSinkOperators which are close to
+ // FileSinkOperators
+ if (!correlation.getTop2TSops().containsKey(rsop)) {
+ correlation.getTop2TSops().put(rsop, new ArrayList());
+ }
+ correlation.getTop2TSops().get(rsop).add(tsop);
+
+ if (!correlation.getBottom2TSops().containsKey(bottomRsop)) {
+ correlation.getBottom2TSops().put(bottomRsop, new ArrayList());
+ }
+ correlation.getBottom2TSops().get(bottomRsop).add(tsop);
+ }
+ }
+ if (isClear) {
+ thisBottomReduceSinkOperators.clear();
+ thisBottomReduceSinkOperators.add(rsop);
+ }
+ }
+ bottomReduceSinkOperators.addAll(thisBottomReduceSinkOperators);
+ }
+
+ if (!peerReduceSinkOperators.containsAll(bottomReduceSinkOperators)) {
+ LOG.info("has job flow correlation");
+ correlation.setJobFlowCorrelation(true);
+ correlation.setJFCCorrelation(peerReduceSinkOperators, bottomReduceSinkOperators);
+ annotateOpPlan(correlation);
+ }
+
+ if (correlation.hasJobFlowCorrelation()) {
+ boolean hasICandTC = findICandTC(correlation);
+ LOG.info("has input correlation and transit correlation? " + hasICandTC);
+ correlation.setInputCorrelation(hasICandTC);
+ correlation.setTransitCorrelation(hasICandTC);
+ boolean hasSelfJoin = hasSelfJoin(correlation);
+ LOG.info("has self-join? " + hasSelfJoin);
+ correlation.setInvolveSelfJoin(hasSelfJoin);
+ // TODO: support self-join involved cases. For self-join related operation paths, after the
+ // correlation dispatch operator, each path should be filtered by a filter operator
+ if (!hasSelfJoin) {
+ LOG.info("correlation detected");
+ correlationCtx.addCorrelation(correlation);
+ } else {
+ LOG.info("correlation discarded. The current optimizer cannot optimize it");
+ }
+ }
+ correlationCtx.addWalkedAll(peerReduceSinkOperators);
+ return null;
+ }
+
+ private boolean hasSelfJoin(IntraQueryCorrelation correlation) {
+ boolean hasSelfJoin = false;
+ for (Entry> entry : correlation
+ .getTable2CorrelatedRSops().entrySet()) {
+ for (ReduceSinkOperator rsop : entry.getValue()) {
+ HashSet intersection = new HashSet(
+ CorrelationOptimizerUtils.findPeerReduceSinkOperators(rsop));
+ intersection.retainAll(entry.getValue());
+ // if self-join is involved
+ if (intersection.size() > 1) {
+ hasSelfJoin = true;
+ return hasSelfJoin;
+ }
+ }
+ }
+ return hasSelfJoin;
+ }
+
+ private boolean findICandTC(IntraQueryCorrelation correlation) {
+
+ boolean hasICandTC = false;
+ HashMap> table2RSops =
+ new HashMap>();
+ HashMap> table2TSops =
+ new HashMap>();
+
+ for (Entry> entry : correlation
+ .getBottom2TSops().entrySet()) {
+ String tbl = AliastoTabName.get(entry.getValue().get(0).getConf().getAlias());
+ if (!table2RSops.containsKey(tbl) && !table2TSops.containsKey(tbl)) {
+ table2RSops.put(tbl, new ArrayList());
+ table2TSops.put(tbl, new ArrayList());
+ }
+ assert entry.getValue().size() == 1;
+ table2RSops.get(tbl).add(entry.getKey());
+ table2TSops.get(tbl).add(entry.getValue().get(0));
+ }
+
+ for (Entry> entry : table2RSops.entrySet()) {
+ if (entry.getValue().size() > 1) {
+ hasICandTC = true;
+ break;
+ }
+ }
+ correlation.setICandTCCorrelation(table2RSops, table2TSops);
+ return hasICandTC;
+ }
+ }
+
+ private NodeProcessor getDefaultProc() {
+ return new NodeProcessor() {
+ @Override
+ public Object process(Node nd, Stack stack,
+ NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
+ LOG.info("Walk to operator " + ((Operator) nd).getIdentifier() + " "
+ + ((Operator) nd).getName());
+ addOperatorInfo((Operator extends OperatorDesc>) nd);
+ return null;
+ }
+ };
+ }
+
+ public class IntraQueryCorrelation {
+
+ private final HashMap> down2upRSops =
+ new HashMap>();
+ private final HashMap> up2downRSops =
+ new HashMap>();
+
+ private final HashMap> top2TSops =
+ new HashMap>();
+ private final HashMap> bottom2TSops =
+ new HashMap>();
+
+ private ArrayList topReduceSinkOperators;
+ private ArrayList bottomReduceSinkOperators;
+
+ private HashMap> table2CorrelatedRSops;
+
+ private HashMap> table2CorrelatedTSops;
+
+ private HashMap bottomReduceSink2OperationPathMap;
+
+ private final HashMap>> dispatchConf =
+ new HashMap>>(); // inputTag->(Child->outputTag)
+ private final HashMap>> dispatchValueSelectDescConf =
+ new HashMap>>(); // inputTag->(Child->SelectDesc)
+ private final HashMap>> dispatchKeySelectDescConf =
+ new HashMap>>(); // inputTag->(Child->SelectDesc)
+
+ private final HashSet allReduceSinkOperators =
+ new HashSet();
+
+ // this set contains all ReduceSink-GroupBy operator-pairs that should be be replaced by
+ // GroupBy-ReduceSink-GroupBy pattern.
+ // the type of first GroupByOperator is hash type and this one will be used to group records.
+ private final HashSet rSGBYToBeReplacedByGBYRSGBY =
+ new HashSet();
+
+ public void addToRSGBYToBeReplacedByGBYRSGBY(ReduceSinkOperator rsop) {
+ rSGBYToBeReplacedByGBYRSGBY.add(rsop);
+ }
+
+ public HashSet getRSGBYToBeReplacedByGBYRSGBY() {
+ return rSGBYToBeReplacedByGBYRSGBY;
+ }
+
+ public void addToAllReduceSinkOperators(ReduceSinkOperator rsop) {
+ allReduceSinkOperators.add(rsop);
+ }
+
+ public HashSet getAllReduceSinkOperators() {
+ return allReduceSinkOperators;
+ }
+
+ public HashMap>> getDispatchConf() {
+ return dispatchConf;
+ }
+
+ public HashMap>> getDispatchValueSelectDescConf() {
+ return dispatchValueSelectDescConf;
+ }
+
+ public HashMap>> getDispatchKeySelectDescConf() {
+ return dispatchKeySelectDescConf;
+ }
+
+ public void addOperationPathToDispatchConf(Integer opPlan) {
+ if (!dispatchConf.containsKey(opPlan)) {
+ dispatchConf.put(opPlan, new HashMap>());
+ }
+ }
+
+ public HashMap> getDispatchConfForOperationPath(Integer opPlan) {
+ return dispatchConf.get(opPlan);
+ }
+
+ public void addOperationPathToDispatchValueSelectDescConf(Integer opPlan) {
+ if (!dispatchValueSelectDescConf.containsKey(opPlan)) {
+ dispatchValueSelectDescConf.put(opPlan, new HashMap>());
+ }
+ }
+
+ public HashMap> getDispatchValueSelectDescConfForOperationPath(
+ Integer opPlan) {
+ return dispatchValueSelectDescConf.get(opPlan);
+ }
+
+ public void addOperationPathToDispatchKeySelectDescConf(Integer opPlan) {
+ if (!dispatchKeySelectDescConf.containsKey(opPlan)) {
+ dispatchKeySelectDescConf.put(opPlan, new HashMap>());
+ }
+ }
+
+ public HashMap> getDispatchKeySelectDescConfForOperationPath(
+ Integer opPlan) {
+ return dispatchKeySelectDescConf.get(opPlan);
+ }
+
+ private boolean inputCorrelation = false;
+ private boolean transitCorrelation = false;
+ private boolean jobFlowCorrelation = false;
+
+ public void setBottomReduceSink2OperationPathMap(
+ HashMap bottomReduceSink2OperationPathMap) {
+ this.bottomReduceSink2OperationPathMap = bottomReduceSink2OperationPathMap;
+ }
+
+ public HashMap getBottomReduceSink2OperationPathMap() {
+ return bottomReduceSink2OperationPathMap;
+ }
+
+ public void setInputCorrelation(boolean inputCorrelation) {
+ this.inputCorrelation = inputCorrelation;
+ }
+
+ public boolean hasInputCorrelation() {
+ return inputCorrelation;
+ }
+
+ public void setTransitCorrelation(boolean transitCorrelation) {
+ this.transitCorrelation = transitCorrelation;
+ }
+
+ public boolean hasTransitCorrelation() {
+ return transitCorrelation;
+ }
+
+ public void setJobFlowCorrelation(boolean jobFlowCorrelation) {
+ this.jobFlowCorrelation = jobFlowCorrelation;
+ }
+
+ public boolean hasJobFlowCorrelation() {
+ return jobFlowCorrelation;
+ }
+
+ public HashMap> getTop2TSops() {
+ return top2TSops;
+ }
+
+ public HashMap> getBottom2TSops() {
+ return bottom2TSops;
+ }
+
+ public HashMap> getDown2upRSops() {
+ return down2upRSops;
+ }
+
+ public HashMap> getUp2downRSops() {
+ return up2downRSops;
+ }
+
+ public void setJFCCorrelation(ArrayList peerReduceSinkOperators,
+ ArrayList bottomReduceSinkOperators) {
+ this.topReduceSinkOperators = peerReduceSinkOperators;
+ this.bottomReduceSinkOperators = bottomReduceSinkOperators;
+ }
+
+
+ public ArrayList getTopReduceSinkOperators() {
+ return topReduceSinkOperators;
+ }
+
+ public ArrayList getBottomReduceSinkOperators() {
+ return bottomReduceSinkOperators;
+ }
+
+ public void setICandTCCorrelation(HashMap> table2RSops,
+ HashMap> table2TSops) {
+ this.table2CorrelatedRSops = table2RSops;
+ this.table2CorrelatedTSops = table2TSops;
+ }
+
+ public HashMap> getTable2CorrelatedRSops() {
+ return table2CorrelatedRSops;
+ }
+
+ public HashMap> getTable2CorrelatedTSops() {
+ return table2CorrelatedTSops;
+ }
+
+ private boolean isInvolveSelfJoin = false;
+
+ public boolean isInvolveSelfJoin() {
+ return isInvolveSelfJoin;
+ }
+
+ public void setInvolveSelfJoin(boolean isInvolveSelfJoin) {
+ this.isInvolveSelfJoin = isInvolveSelfJoin;
+ }
+
+ }
+
+ private class CorrelationNodePhase1ProcCtx implements NodeProcessorCtx {
+ public int fileSinkOperatorCount = 0;
+ }
+
+ private class CorrelationNodeProcCtx implements NodeProcessorCtx {
+
+ private final HashSet walked = new HashSet();
+
+ private final ArrayList correlations =
+ new ArrayList();
+
+ public void addCorrelation(IntraQueryCorrelation correlation) {
+ correlations.add(correlation);
+ }
+
+ public ArrayList getCorrelations() {
+ return correlations;
+ }
+
+ public boolean isWalked(ReduceSinkOperator op) {
+ return walked.contains(op);
+ }
+
+ public void addWalked(ReduceSinkOperator op) {
+ walked.add(op);
+ }
+
+ public void addWalkedAll(Collection c) {
+ walked.addAll(c);
+ }
+ }
+
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java
new file mode 100644
index 0000000..65bbd97
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java
@@ -0,0 +1,805 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.CorrelationCompositeOperator;
+import org.apache.hadoop.hive.ql.exec.CorrelationLocalSimulativeReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.CorrelationOptimizer.IntraQueryCorrelation;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
+import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationLocalSimulativeReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.ForwardDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+
+
+public final class CorrelationOptimizerUtils {
+
+ static final private Log LOG = LogFactory.getLog(CorrelationOptimizerUtils.class.getName());
+
+ public static boolean isExisted(ExprNodeDesc expr, ArrayList col_list) {
+ for (ExprNodeDesc thisExpr : col_list) {
+ if (expr.getExprString().equals(thisExpr.getExprString())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static String getColumnName(Map opColumnExprMap, ExprNodeDesc expr) {
+ for (Entry entry : opColumnExprMap.entrySet()) {
+ if (expr.getExprString().equals(entry.getValue().getExprString())) {
+ return entry.getKey();
+ }
+ }
+ return null;
+ }
+
+
+ public static Operator extends OperatorDesc> unionUsedColumnsAndMakeNewSelect(
+ ArrayList rsops,
+ IntraQueryCorrelation correlation, LinkedHashMap,
+ Map> originalOpColumnExprMap, TableScanOperator input,
+ ParseContext pGraphContext,
+ LinkedHashMap, OpParseContext> originalOpParseCtx) {
+
+ ArrayList columnNames = new ArrayList();
+ Map colExprMap = new HashMap();
+ ArrayList col_list = new ArrayList();
+ RowResolver out_rwsch = new RowResolver();
+ boolean isSelectAll = false;
+
+ int pos = 0;
+ for (ReduceSinkOperator rsop : rsops) {
+ Operator extends OperatorDesc> curr = correlation.getBottom2TSops().get(rsop).get(0)
+ .getChildOperators().get(0);
+ while (true) {
+ if (curr.getName().equals(SelectOperator.getOperatorName())) {
+ SelectOperator selOp = (SelectOperator) curr;
+ if (selOp.getColumnExprMap() != null) {
+ for (Entry entry : selOp.getColumnExprMap().entrySet()) {
+ ExprNodeDesc expr = entry.getValue();
+ if (!isExisted(expr, col_list)
+ && originalOpParseCtx.get(selOp).getRowResolver().getInvRslvMap()
+ .containsKey(entry.getKey())) {
+ col_list.add(expr);
+ String[] colRef = originalOpParseCtx.get(selOp).getRowResolver().getInvRslvMap()
+ .get(entry.getKey());
+ String tabAlias = colRef[0];
+ String colAlias = colRef[1];
+ String outputName = entry.getKey();
+ out_rwsch.put(tabAlias, colAlias, new ColumnInfo(
+ outputName, expr.getTypeInfo(), tabAlias, false));
+ pos++;
+ columnNames.add(outputName);
+ colExprMap.put(outputName, expr);
+ }
+ }
+ } else {
+ for (ExprNodeDesc expr : selOp.getConf().getColList()) {
+ if (!isExisted(expr, col_list)) {
+ col_list.add(expr);
+ String[] colRef = pGraphContext.getOpParseCtx().get(selOp).getRowResolver()
+ .getInvRslvMap().get(expr.getCols().get(0));
+ String tabAlias = colRef[0];
+ String colAlias = colRef[1];
+ String outputName = expr.getCols().get(0);
+ out_rwsch.put(tabAlias, colAlias, new ColumnInfo(
+ outputName, expr.getTypeInfo(), tabAlias, false));
+ columnNames.add(outputName);
+ colExprMap.put(outputName, expr);
+ pos++;
+ }
+ }
+ }
+ break;
+ } else if (curr.getName().equals(FilterOperator.getOperatorName())) {
+ isSelectAll = true;
+ break;
+ } else if (curr.getName().equals(ReduceSinkOperator.getOperatorName())) {
+ ReduceSinkOperator thisRSop = (ReduceSinkOperator) curr;
+ for (ExprNodeDesc expr : thisRSop.getConf().getKeyCols()) {
+ if (!isExisted(expr, col_list)) {
+ col_list.add(expr);
+ assert expr.getCols().size() == 1;
+ String columnName = getColumnName(originalOpColumnExprMap.get(thisRSop), expr);
+ String[] colRef = pGraphContext.getOpParseCtx().get(thisRSop).getRowResolver()
+ .getInvRslvMap().get(columnName);
+ String tabAlias = colRef[0];
+ String colAlias = colRef[1];
+ String outputName = expr.getCols().get(0);
+ out_rwsch.put(tabAlias, colAlias, new ColumnInfo(
+ outputName, expr.getTypeInfo(), tabAlias, false));
+ columnNames.add(outputName);
+ colExprMap.put(outputName, expr);
+ pos++;
+ }
+ }
+ for (ExprNodeDesc expr : thisRSop.getConf().getValueCols()) {
+ if (!isExisted(expr, col_list)) {
+ col_list.add(expr);
+ assert expr.getCols().size() == 1;
+ String columnName = getColumnName(originalOpColumnExprMap.get(thisRSop), expr);
+ String[] colRef = pGraphContext.getOpParseCtx().get(thisRSop).getRowResolver()
+ .getInvRslvMap().get(columnName);
+ String tabAlias = colRef[0];
+ String colAlias = colRef[1];
+ String outputName = expr.getCols().get(0);
+ out_rwsch.put(tabAlias, colAlias, new ColumnInfo(
+ outputName, expr.getTypeInfo(), tabAlias, false));
+ columnNames.add(outputName);
+ colExprMap.put(outputName, expr);
+ pos++;
+ }
+ }
+
+ break;
+ } else {
+ curr = curr.getChildOperators().get(0);
+ }
+ }
+ }
+
+ Operator extends OperatorDesc> output;
+ if (isSelectAll) {
+ output = input;
+ } else {
+ output = putOpInsertMap(OperatorFactory.getAndMakeChild(
+ new SelectDesc(col_list, columnNames, false), new RowSchema(
+ out_rwsch.getColumnInfos()), input), out_rwsch, pGraphContext.getOpParseCtx());
+ output.setColumnExprMap(colExprMap);
+ output.setChildOperators(Utilities.makeList());
+
+ }
+
+ return output;
+ }
+
+
+ public static Operator extends OperatorDesc> putOpInsertMap(
+ Operator extends OperatorDesc> op,
+ RowResolver rr, LinkedHashMap, OpParseContext> opParseCtx) {
+ OpParseContext ctx = new OpParseContext(rr);
+ opParseCtx.put(op, ctx);
+ op.augmentPlan();
+ return op;
+ }
+
+ public static HashMap, String> getAliasIDtTopOps(
+ HashMap> topOps) {
+ HashMap, String> aliasIDtTopOps =
+ new HashMap, String>();
+ for (Entry