diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 5efae89..2dc3886 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -494,6 +494,7 @@ public class HiveConf extends Configuration { HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true), + HIVEOPTCORRELATION("hive.optimize.correlation", true), // exploit intra-query correlations // Indexes HIVEOPTINDEXFILTER_COMPACT_MINSIZE("hive.optimize.index.filter.compact.minsize", (long) 5 * 1024 * 1024 * 1024), // 5G diff --git conf/hive-site.xml conf/hive-site.xml index dab494e..34ecab9 100644 --- conf/hive-site.xml +++ conf/hive-site.xml @@ -19,4 +19,9 @@ + + hive.optimize.correlation + true + + diff --git data/conf/hive-site.xml data/conf/hive-site.xml index 907d333..496a83c 100644 --- data/conf/hive-site.xml +++ data/conf/hive-site.xml @@ -176,4 +176,9 @@ The default input format, if it is not specified, the system assigns it. It is set to HiveInputFormat for hadoop versions 17, 18 and 19, whereas it is set to CombineHiveInputFormat for hadoop 20. The user can always overwrite it - if there is a bug in CombineHiveInputFormat, it can always be manually set to HiveInputFormat. + + hive.optimize.correlation + true + + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java new file mode 100644 index 0000000..aa220a9 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.BaseReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.Serializer; +import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * BaseReduceSinkOperator + **/ +public abstract class BaseReduceSinkOperator extends + TerminalOperator + implements Serializable { + + private static final long serialVersionUID = 1L; + protected static final Log LOG = LogFactory.getLog(BaseReduceSinkOperator.class + .getName()); + + /** + * The evaluators for the key columns. Key columns decide the sort order on + * the reducer side. Key columns are passed to the reducer in the "key". + */ + protected transient ExprNodeEvaluator[] keyEval; + /** + * The evaluators for the value columns. Value columns are passed to reducer + * in the "value". + */ + protected transient ExprNodeEvaluator[] valueEval; + /** + * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in + * Hive language). Partition columns decide the reducer that the current row + * goes to. Partition columns are not passed to reducer. + */ + protected transient ExprNodeEvaluator[] partitionEval; + + // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is + // ready + transient Serializer keySerializer; + transient boolean keyIsText; + transient Serializer valueSerializer; + transient int tag; + transient byte[] tagByte = new byte[1]; + transient protected int numDistributionKeys; + transient protected int numDistinctExprs; + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + + try { + keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()]; + int i = 0; + for (ExprNodeDesc e : conf.getKeyCols()) { + keyEval[i++] = ExprNodeEvaluatorFactory.get(e); + } + + numDistributionKeys = conf.getNumDistributionKeys(); + distinctColIndices = conf.getDistinctColumnIndices(); + numDistinctExprs = distinctColIndices.size(); + + valueEval = new ExprNodeEvaluator[conf.getValueCols().size()]; + i = 0; + for (ExprNodeDesc e : conf.getValueCols()) { + valueEval[i++] = ExprNodeEvaluatorFactory.get(e); + } + + partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()]; + i = 0; + for (ExprNodeDesc e : conf.getPartitionCols()) { + partitionEval[i++] = ExprNodeEvaluatorFactory.get(e); + } + + tag = conf.getTag(); + tagByte[0] = (byte) tag; + LOG.info("Using tag = " + tag); + + TableDesc keyTableDesc = conf.getKeySerializeInfo(); + keySerializer = (Serializer) keyTableDesc.getDeserializerClass() + .newInstance(); + keySerializer.initialize(null, keyTableDesc.getProperties()); + keyIsText = keySerializer.getSerializedClass().equals(Text.class); + + TableDesc valueTableDesc = conf.getValueSerializeInfo(); + valueSerializer = (Serializer) valueTableDesc.getDeserializerClass() + .newInstance(); + valueSerializer.initialize(null, valueTableDesc.getProperties()); + + firstRow = true; + initializeChildren(hconf); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + transient InspectableObject tempInspectableObject = new InspectableObject(); + transient HiveKey keyWritable = new HiveKey(); + transient Writable value; + + transient StructObjectInspector keyObjectInspector; + transient StructObjectInspector valueObjectInspector; + transient ObjectInspector[] partitionObjectInspectors; + + transient Object[][] cachedKeys; + transient Object[] cachedValues; + transient List> distinctColIndices; + + boolean firstRow; + + transient Random random; + + /** + * Initializes array of ExprNodeEvaluator. Adds Union field for distinct + * column indices for group by. + * Puts the return values into a StructObjectInspector with output column + * names. + * + * If distinctColIndices is empty, the object inspector is same as + * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)} + */ + protected static StructObjectInspector initEvaluatorsAndReturnStruct( + ExprNodeEvaluator[] evals, List> distinctColIndices, + List outputColNames, + int length, ObjectInspector rowInspector) + throws HiveException { + int inspectorLen = evals.length > length ? length + 1 : evals.length; + List sois = new ArrayList(inspectorLen); + + // keys + ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, 0, length, rowInspector); + sois.addAll(Arrays.asList(fieldObjectInspectors)); + + if (evals.length > length) { + // union keys + List uois = new ArrayList(); + for (List distinctCols : distinctColIndices) { + List names = new ArrayList(); + List eois = new ArrayList(); + int numExprs = 0; + for (int i : distinctCols) { + names.add(HiveConf.getColumnInternalName(numExprs)); + eois.add(evals[i].initialize(rowInspector)); + numExprs++; + } + uois.add(ObjectInspectorFactory.getStandardStructObjectInspector(names, eois)); + } + UnionObjectInspector uoi = + ObjectInspectorFactory.getStandardUnionObjectInspector(uois); + sois.add(uoi); + } + return ObjectInspectorFactory.getStandardStructObjectInspector(outputColNames, sois); + } + + @Override + public abstract void processOp(Object row, int tag) throws HiveException; + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return "BaseReduceSink"; + } + + @Override + public OperatorType getType() { + return null; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java new file mode 100644 index 0000000..13efa7f --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.io.LongWritable; + +/** + * Correlation composite operator implementation. This operator is used only in map phase for + * sharing table scan. Suppose that there are multiple operation paths (e.g. two different + * predicates on a table ) that share a common table. A row will be processed by these operation + * paths. To tag which operation paths actually forward this row, CorrelationCompositeOperator is + * used. For a row, this operator will buffer forwarded rows from its parents and then tag this row + * with a operation path tag indicating which paths forwarded this row. Right now, since operation + * path tag used in ReduceSinkOperator has 1 byte, this operator can have at most 8 parents + * (operation paths). For example, suppose that the common table is T and predicates P1 and P2 will + * be used in sub-queries SQ1 and SQ2, respectively. The CorrelationCompositeOperator + * will apply P1 and P2 on the row and tag the record based on if P1 or P2 is true. + **/ +public class CorrelationCompositeOperator extends Operator implements +Serializable { + + static final private Log LOG = LogFactory.getLog(Driver.class.getName()); + + public static enum Counter { + FORWARDED + } + + private static final long serialVersionUID = 1L; + + private ReduceSinkOperator correspondingReduceSinkOperators; + + private transient final LongWritable forwarded_count; + + private transient boolean firstRow; + + private int[] allOperationPathTags; + + private Object[] rowBuffer; // buffer the output from multiple parents + + public CorrelationCompositeOperator() { + super(); + forwarded_count = new LongWritable(); + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + firstRow = true; + rowBuffer = new Object[parentOperators.size()]; + correspondingReduceSinkOperators = conf.getCorrespondingReduceSinkOperator(); + allOperationPathTags = conf.getAllOperationPathTags(); + statsMap.put(Counter.FORWARDED, forwarded_count); + outputObjInspector = + ObjectInspectorUtils.getStandardObjectInspector(outputObjInspector, + ObjectInspectorCopyOption.JAVA); + + // initialize its children + initializeChildren(hconf); + } + + @Override + public void processOp(Object row, int tag) throws HiveException { + rowBuffer[tag] = + ObjectInspectorUtils.copyToStandardObject(row, inputObjInspectors[tag], + ObjectInspectorCopyOption.JAVA); + } + + private void evaluateBuffer() { + ArrayList operationPathTags = new ArrayList(); + boolean isForward = false; + Object forwardedRow = null; + for (int i = 0; i < rowBuffer.length; i++) { + if (rowBuffer[i] != null) { + isForward = true; + operationPathTags.add(allOperationPathTags[i]); + if (forwardedRow == null) { + forwardedRow = rowBuffer[i]; + } + } + } + if (isForward) { + assert correspondingReduceSinkOperators != null; + correspondingReduceSinkOperators.setOperationPathTags(operationPathTags); + forwarded_count.set(forwarded_count.get() + 1); + try { + forward(forwardedRow, null); + } catch (HiveException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + for (int i = 0; i < rowBuffer.length; i++) { + rowBuffer[i] = null; + } + } + + @Override + public void setRowNumber(long rowNumber) { + this.rowNumber = rowNumber; + if (childOperators == null) { + return; + } + for (int i = 0; i < childOperatorsArray.length; i++) { + assert rowNumber >= childOperatorsArray[i].getRowNumber(); + if (rowNumber != childOperatorsArray[i].getRowNumber()) { + childOperatorsArray[i].setRowNumber(rowNumber); + } + } + if (firstRow) { + for (int i = 0; i < rowBuffer.length; i++) { + rowBuffer[i] = null; + } + firstRow = false; + } else { + evaluateBuffer(); + } + } + + @Override + public void closeOp(boolean abort) throws HiveException { + if (!abort) { + evaluateBuffer(); + } + } + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return getOperatorName(); + } + + static public String getOperatorName() { + return "CCO"; + } + + @Override + public OperatorType getType() { + // TODO Auto-generated method stub + return null; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java new file mode 100644 index 0000000..7d56da8 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.util.ArrayList; +import java.util.Arrays; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CorrelationLocalSimulativeReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.Serializer; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * CorrelationLocalSimulativeReduceSinkOperator simulates a ReduceSinkOperator and sends output to + * another operator (JOIN or GBY). CorrelationLocalSimulativeReduceSinkOperator is used only in + * reduce phase. Basically, it is a bridge from one JOIN or GBY operator to another JOIN or GBY + * operator. A CorrelationLocalSimulativeReduceSinkOperator will take care actions of startGroup and + * endGroup of its succeeding JOIN or GBY operator. + * Example: A query involves a JOIN operator and a GBY operator and the GBY operator consume the + * output of the JOIN operator. In this case, if join keys and group by keys are the same, we do not + * need to shuffle the data again, since data has been already partitioned by the JOIN operator. + * Thus, in CorrelationOptimizer, the ReduceSinkOperator between JOIN and GBY operator will be + * replaced by a CorrelationLocalSimulativeReduceSinkOperator and the JOIN operator and GBY operator + * will be executed in a single reduce phase. + **/ +public class CorrelationLocalSimulativeReduceSinkOperator +extends BaseReduceSinkOperator { + + private static final long serialVersionUID = 1L; + + transient TableDesc keyTableDesc; + transient TableDesc valueTableDesc; + + transient Deserializer inputKeyDeserializer; + + transient SerDe inputValueDeserializer; + + transient ByteWritable tagWritable; + + transient ObjectInspector outputKeyObjectInspector; + transient ObjectInspector outputValueObjectInspector; + transient ObjectInspector[] outputPartitionObjectInspectors; + + private ArrayList forwardedRow; + private Object keyObject; + private Object valueObject; + + private BytesWritable groupKey; + + private static String[] fieldNames; + + static { + ArrayList fieldNameArray = new ArrayList(); + for (Utilities.ReduceField r : Utilities.ReduceField.values()) { + fieldNameArray.add(r.toString()); + } + fieldNames = fieldNameArray.toArray(new String[0]); + } + + public CorrelationLocalSimulativeReduceSinkOperator() { + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + forwardedRow = new ArrayList(3); + tagByte = new byte[1]; + tagWritable = new ByteWritable(); + tempInspectableObject = new InspectableObject(); + keyWritable = new HiveKey(); + assert childOperatorsArray.length == 1; + try { + keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()]; + int i = 0; + for (ExprNodeDesc e : conf.getKeyCols()) { + keyEval[i++] = ExprNodeEvaluatorFactory.get(e); + } + + numDistributionKeys = conf.getNumDistributionKeys(); + distinctColIndices = conf.getDistinctColumnIndices(); + numDistinctExprs = distinctColIndices.size(); + + valueEval = new ExprNodeEvaluator[conf.getValueCols().size()]; + i = 0; + for (ExprNodeDesc e : conf.getValueCols()) { + valueEval[i++] = ExprNodeEvaluatorFactory.get(e); + } + + tag = conf.getTag(); + tagByte[0] = (byte) tag; + tagWritable.set(tagByte[0]); + LOG.info("Using tag = " + tag); + + TableDesc keyTableDesc = conf.getKeySerializeInfo(); + keySerializer = (Serializer) keyTableDesc.getDeserializerClass() + .newInstance(); + keySerializer.initialize(null, keyTableDesc.getProperties()); + keyIsText = keySerializer.getSerializedClass().equals(Text.class); + + inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc + .getDeserializerClass(), null); + inputKeyDeserializer.initialize(null, keyTableDesc.getProperties()); + outputKeyObjectInspector = inputKeyDeserializer.getObjectInspector(); + + TableDesc valueTableDesc = conf.getValueSerializeInfo(); + valueSerializer = (Serializer) valueTableDesc.getDeserializerClass() + .newInstance(); + valueSerializer.initialize(null, valueTableDesc.getProperties()); + + inputValueDeserializer = (SerDe) ReflectionUtils.newInstance( + valueTableDesc.getDeserializerClass(), null); + inputValueDeserializer.initialize(null, valueTableDesc + .getProperties()); + outputValueObjectInspector = inputValueDeserializer.getObjectInspector(); + + ObjectInspector rowInspector = inputObjInspectors[0]; + + keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval, + distinctColIndices, + conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector); + valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, conf + .getOutputValueColumnNames(), rowInspector); + int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1; + int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 : + numDistributionKeys; + cachedKeys = new Object[numKeys][keyLen]; + cachedValues = new Object[valueEval.length]; + assert cachedKeys.length == 1; + + ArrayList ois = new ArrayList(); + ois.add(outputKeyObjectInspector); + ois.add(outputValueObjectInspector); + ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector); + + outputObjInspector = ObjectInspectorFactory + .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois); + + LOG.info("Simulative ReduceSink inputObjInspectors" + + ((StructObjectInspector) inputObjInspectors[0]).getTypeName()); + + LOG.info("Simulative ReduceSink outputObjInspectors " + + this.getChildOperators().get(0).getParentOperators().indexOf(this) + + " " + ((StructObjectInspector) outputObjInspector).getTypeName()); + + initializeChildren(hconf); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + @Override + public void processOp(Object row, int tag) throws HiveException { + try { + // Evaluate the value + for (int i = 0; i < valueEval.length; i++) { + cachedValues[i] = valueEval[i].evaluate(row); + } + // Serialize the value + value = valueSerializer.serialize(cachedValues, valueObjectInspector); + valueObject = inputValueDeserializer.deserialize(value); + + // Evaluate the keys + Object[] distributionKeys = new Object[numDistributionKeys]; + for (int i = 0; i < numDistributionKeys; i++) { + distributionKeys[i] = keyEval[i].evaluate(row); + } + + if (numDistinctExprs > 0) { + // with distinct key(s) + for (int i = 0; i < numDistinctExprs; i++) { + System.arraycopy(distributionKeys, 0, cachedKeys[i], 0, numDistributionKeys); + Object[] distinctParameters = + new Object[distinctColIndices.get(i).size()]; + for (int j = 0; j < distinctParameters.length; j++) { + distinctParameters[j] = + keyEval[distinctColIndices.get(i).get(j)].evaluate(row); + } + cachedKeys[i][numDistributionKeys] = + new StandardUnion((byte) i, distinctParameters); + } + } else { + // no distinct key + System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys); + } + + for (int i = 0; i < cachedKeys.length; i++) { + if (keyIsText) { + Text key = (Text) keySerializer.serialize(cachedKeys[i], + keyObjectInspector); + keyWritable.set(key.getBytes(), 0, key.getLength()); + } else { + // Must be BytesWritable + BytesWritable key = (BytesWritable) keySerializer.serialize( + cachedKeys[i], keyObjectInspector); + keyWritable.set(key.getBytes(), 0, key.getLength()); + } + if (!keyWritable.equals(groupKey)) { + try { + keyObject = inputKeyDeserializer.deserialize(keyWritable); + } catch (Exception e) { + throw new HiveException( + "Hive Runtime Error: Unable to deserialize reduce input key from " + + Utilities.formatBinaryString(keyWritable.get(), 0, + keyWritable.getSize()) + " with properties " + + keyTableDesc.getProperties(), e); + } + if (groupKey == null) { // the first group + groupKey = new BytesWritable(); + } else { + // if its child has not been ended, end it + if (!keyWritable.equals(childOperatorsArray[0].getBytesWritableGroupKey())) { + childOperatorsArray[0].endGroup(); + } + } + groupKey.set(keyWritable.get(), 0, keyWritable.getSize()); + if (!groupKey.equals(childOperatorsArray[0].getBytesWritableGroupKey())) { + childOperatorsArray[0].startGroup(); + childOperatorsArray[0].setGroupKeyObject(keyObject); + childOperatorsArray[0].setBytesWritableGroupKey(groupKey); + } + } + forwardedRow.clear(); + forwardedRow.add(keyObject); + forwardedRow.add(valueObject); + forwardedRow.add(tagWritable); + forward(forwardedRow, outputObjInspector); + } + } catch (SerDeException e1) { + e1.printStackTrace(); + } + } + + @Override + public void closeOp(boolean abort) throws HiveException { + if (!abort) { + if (childOperatorsArray[0].allInitializedParentsAreClosed()) { + childOperatorsArray[0].endGroup(); + } + } + } + + @Override + public void startGroup() throws HiveException { + // do nothing + } + + @Override + public void endGroup() throws HiveException { + // do nothing + } + + @Override + public void setGroupKeyObject(Object keyObject) { + // do nothing + } + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return getOperatorName(); + } + + static public String getOperatorName() { + return "CLSReduceSink"; + } + + @Override + public OperatorType getType() { + return null; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java new file mode 100644 index 0000000..b667494 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java @@ -0,0 +1,419 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; + +/** + * CorrelationReducerDispatchOperator is an operator used by MapReduce join optimized by + * CorrelationOptimizer. If used, CorrelationReducerDispatchOperator is the first operator in reduce + * phase. In the case that multiple operation paths are merged into a single one, it will dispatch + * the record to corresponding JOIN or GBY operators. Every child of this operator is associated + * with a DispatcherHnadler, which evaluates the input row of this operator and then select + * corresponding fields for its associated child. + */ +public class CorrelationReducerDispatchOperator extends Operator +implements Serializable { + + private static final long serialVersionUID = 1L; + private static String[] fieldNames; + static { + ArrayList fieldNameArray = new ArrayList(); + for (Utilities.ReduceField r : Utilities.ReduceField.values()) { + fieldNameArray.add(r.toString()); + } + fieldNames = fieldNameArray.toArray(new String[0]); + } + + protected static class DispatchHandler { + + protected Log l4j = LogFactory.getLog(this.getClass().getName()); + + private final ObjectInspector[] inputObjInspector; + private ObjectInspector outputObjInspector; + private ObjectInspector keyObjInspector; + private ObjectInspector valueObjInspector; + private final byte inputTag; + private final byte outputTag; + private final byte childIndx; + private final ByteWritable outputTagByteWritable; + private final SelectDesc selectDesc; + private final SelectDesc keySelectDesc; + private ExprNodeEvaluator[] keyEval; + private ExprNodeEvaluator[] eval; + + // counters for debugging + private transient long cntr = 0; + private transient long nextCntr = 1; + + private long getNextCntr(long cntr) { + // A very simple counter to keep track of number of rows processed by an + // operator. It dumps + // every 1 million times, and quickly before that + if (cntr >= 1000000) { + return cntr + 1000000; + } + return 10 * cntr; + } + + public long getCntr() { + return this.cntr; + } + + private final Log LOG; + private final boolean isLogInfoEnabled; + private final String id; + + public DispatchHandler(ObjectInspector[] inputObjInspector, byte inputTag, byte childIndx, + byte outputTag, + SelectDesc selectDesc, SelectDesc keySelectDesc, Log LOG, String id) + throws HiveException { + this.inputObjInspector = inputObjInspector; + assert this.inputObjInspector.length == 1; + this.inputTag = inputTag; + this.childIndx = childIndx; + this.outputTag = outputTag; + this.selectDesc = selectDesc; + this.keySelectDesc = keySelectDesc; + this.outputTagByteWritable = new ByteWritable(outputTag); + this.LOG = LOG; + this.isLogInfoEnabled = LOG.isInfoEnabled(); + this.id = id; + init(); + } + + private void init() throws HiveException { + ArrayList ois = new ArrayList(); + if (keySelectDesc.isSelStarNoCompute()) { + ois.add((ObjectInspector) ((ArrayList) inputObjInspector[0]).get(0)); + } else { + ArrayList colList = this.keySelectDesc.getColList(); + keyEval = new ExprNodeEvaluator[colList.size()]; + for (int k = 0; k < colList.size(); k++) { + assert (colList.get(k) != null); + keyEval[k] = ExprNodeEvaluatorFactory.get(colList.get(k)); + } + keyObjInspector = + initEvaluatorsAndReturnStruct(keyEval, keySelectDesc + .getOutputColumnNames(), ((StandardStructObjectInspector) inputObjInspector[0]) + .getAllStructFieldRefs().get(0).getFieldObjectInspector()); + + ois.add(keyObjInspector); + l4j.info("Key: input tag " + (int) inputTag + ", output tag " + (int) outputTag + + ", SELECT inputOIForThisTag" + + ((StructObjectInspector) inputObjInspector[0]).getTypeName()); + } + if (selectDesc.isSelStarNoCompute()) { + ois.add((ObjectInspector) ((ArrayList) inputObjInspector[0]).get(1)); + } else { + ArrayList colList = this.selectDesc.getColList(); + eval = new ExprNodeEvaluator[colList.size()]; + for (int k = 0; k < colList.size(); k++) { + assert (colList.get(k) != null); + eval[k] = ExprNodeEvaluatorFactory.get(colList.get(k)); + } + valueObjInspector = + initEvaluatorsAndReturnStruct(eval, selectDesc + .getOutputColumnNames(), ((StandardStructObjectInspector) inputObjInspector[0]) + .getAllStructFieldRefs().get(1).getFieldObjectInspector()); + + ois.add(valueObjInspector); + l4j.info("input tag " + (int) inputTag + ", output tag " + (int) outputTag + + ", SELECT inputOIForThisTag" + + ((StructObjectInspector) inputObjInspector[0]).getTypeName()); + } + ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector); + outputObjInspector = ObjectInspectorFactory + .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois); + l4j.info("input tag " + (int) inputTag + ", output tag " + (int) outputTag + + ", SELECT outputObjInspector" + + ((StructObjectInspector) outputObjInspector).getTypeName()); + } + + public ObjectInspector getOutputObjInspector() { + return outputObjInspector; + } + + public Object process(Object row) throws HiveException { + ArrayList keyOutput = new ArrayList(keyEval.length); + Object[] valueOutput = new Object[eval.length]; + ArrayList outputRow = new ArrayList(3); + List thisRow = (List) row; + if (keySelectDesc.isSelStarNoCompute()) { + outputRow.add(thisRow.get(0)); + } else { + Object key = thisRow.get(0); + for (int j = 0; j < keyEval.length; j++) { + try { + keyOutput.add(keyEval[j].evaluate(key)); + } catch (HiveException e) { + throw e; + } catch (RuntimeException e) { + throw new HiveException("Error evaluating " + + keySelectDesc.getColList().get(j).getExprString(), e); + } + } + outputRow.add(keyOutput); + } + + if (selectDesc.isSelStarNoCompute()) { + outputRow.add(thisRow.get(1)); + } else { + Object value = thisRow.get(1); + for (int j = 0; j < eval.length; j++) { + try { + valueOutput[j] = eval[j].evaluate(value); + } catch (HiveException e) { + throw e; + } catch (RuntimeException e) { + throw new HiveException("Error evaluating " + + selectDesc.getColList().get(j).getExprString(), e); + } + } + outputRow.add(valueOutput); + } + outputRow.add(outputTagByteWritable); + + if (isLogInfoEnabled) { + cntr++; + if (cntr == nextCntr) { + LOG.info(id + "(inputTag, childIndx, outputTag)=(" + inputTag + ", " + childIndx + ", " + + outputTag + "), forwarding " + cntr + " rows"); + nextCntr = getNextCntr(cntr); + } + } + return outputRow; + } + + public void printCloseOpLog() { + LOG.info(id + "(inputTag, childIndx, outputTag)=(" + inputTag + ", " + childIndx + ", " + + outputTag + "), forwarded " + cntr + " rows"); + } + } + + // inputTag->(Child->List) + private HashMap>> dispatchConf; + // inputTag->(Child->List) + private HashMap>> dispatchValueSelectDescConf; + // inputTag->(Child->List) + private HashMap>> dispatchKeySelectDescConf; + // inputTag->(Child->List) + private HashMap>> dispatchHandlers; + // Child->(outputTag->DispatchHandler) + private HashMap> child2OutputTag2DispatchHandlers; + // Child->Child's inputObjInspectors + private HashMap childInputObjInspectors; + + private int operationPathTag; + private int inputTag; + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + dispatchConf = conf.getDispatchConf(); + dispatchValueSelectDescConf = conf.getDispatchValueSelectDescConf(); + dispatchKeySelectDescConf = conf.getDispatchKeySelectDescConf(); + dispatchHandlers = new HashMap>>(); + for (Entry>> entry : dispatchConf.entrySet()) { + HashMap> tmp = + new HashMap>(); + for (Entry> child2outputTag : entry.getValue().entrySet()) { + tmp.put(child2outputTag.getKey(), new ArrayList()); + int indx = 0; + for (Integer outputTag : child2outputTag.getValue()) { + tmp.get(child2outputTag.getKey()).add( + new DispatchHandler(new ObjectInspector[] {inputObjInspectors[entry.getKey()]}, + entry.getKey().byteValue(), child2outputTag.getKey().byteValue(), + outputTag.byteValue(), dispatchValueSelectDescConf.get(entry.getKey()) + .get(child2outputTag.getKey()).get(indx), + dispatchKeySelectDescConf.get(entry.getKey()).get(child2outputTag.getKey()) + .get(indx), LOG, id)); + indx++; + } + } + dispatchHandlers.put(entry.getKey(), tmp); + } + + child2OutputTag2DispatchHandlers = new HashMap>(); + for (Entry>> entry : dispatchConf.entrySet()) { + for (Entry> child2outputTag : entry.getValue().entrySet()) { + if (!child2OutputTag2DispatchHandlers.containsKey(child2outputTag.getKey())) { + child2OutputTag2DispatchHandlers.put(child2outputTag.getKey(), + new HashMap()); + } + int indx = 0; + for (Integer outputTag : child2outputTag.getValue()) { + child2OutputTag2DispatchHandlers.get(child2outputTag.getKey()). + put(outputTag, + dispatchHandlers.get(entry.getKey()).get(child2outputTag.getKey()).get(indx)); + indx++; + } + } + } + + childInputObjInspectors = new HashMap(); + for (Entry> entry : child2OutputTag2DispatchHandlers + .entrySet()) { + Integer l = Collections.max(entry.getValue().keySet()); + ObjectInspector[] childObjInspectors = new ObjectInspector[l.intValue() + 1]; + for (Entry e : entry.getValue().entrySet()) { + if (e.getKey().intValue() == -1) { + assert childObjInspectors.length == 1; + childObjInspectors[0] = e.getValue().getOutputObjInspector(); + } else { + childObjInspectors[e.getKey().intValue()] = e.getValue().getOutputObjInspector(); + } + } + childInputObjInspectors.put(entry.getKey(), childObjInspectors); + } + + initializeChildren(hconf); + } + + // Each child should has its own outputObjInspector + @Override + protected void initializeChildren(Configuration hconf) throws HiveException { + state = State.INIT; + LOG.info("Operator " + id + " " + getName() + " initialized"); + if (childOperators == null) { + return; + } + LOG.info("Initializing children of " + id + " " + getName()); + for (int i = 0; i < childOperatorsArray.length; i++) { + LOG.info("Initializing child " + i + " " + childOperatorsArray[i].getIdentifier() + " " + + childOperatorsArray[i].getName() + + " " + childInputObjInspectors.get(i).length); + childOperatorsArray[i].initialize(hconf, childInputObjInspectors.get(i)); + if (reporter != null) { + childOperatorsArray[i].setReporter(reporter); + } + } + } + + @Override + public void processOp(Object row, int tag) throws HiveException { + ArrayList thisRow = (ArrayList) row; + assert thisRow.size() == 4; + operationPathTag = ((ByteWritable) thisRow.get(3)).get(); + inputTag = ((ByteWritable) thisRow.get(2)).get(); + forward(thisRow.subList(0, 3), inputObjInspectors[inputTag]); + } + + @Override + public void forward(Object row, ObjectInspector rowInspector) + throws HiveException { + if ((++outputRows % 1000) == 0) { + if (counterNameToEnum != null) { + incrCounter(numOutputRowsCntr, outputRows); + outputRows = 0; + } + } + + if (childOperatorsArray == null && childOperators != null) { + throw new HiveException("Internal Hive error during operator initialization."); + } + + if ((childOperatorsArray == null) || (getDone())) { + return; + } + + int childrenDone = 0; + int forwardFlag = 1; + assert childOperatorsArray.length <= 8; + for (int i = 0; i < childOperatorsArray.length; i++) { + Operator o = childOperatorsArray[i]; + if (o.getDone()) { + childrenDone++; + } else { + if ((operationPathTag & (forwardFlag << i)) != 0) { + for (int j = 0; j < dispatchHandlers.get(inputTag).get(i).size(); j++) { + o.process(dispatchHandlers.get(inputTag).get(i).get(j).process(row), + dispatchConf.get(inputTag).get(i).get(j)); + } + } + } + } + + // if all children are done, this operator is also done + if (childrenDone == childOperatorsArray.length) { + setDone(true); + } + } + + @Override + protected void closeOp(boolean abort) throws HiveException { + // log the number of rows forwarded from each dispatcherHandler + for (HashMap> childIndx2DispatchHandlers : dispatchHandlers + .values()) { + for (ArrayList dispatchHandlers : childIndx2DispatchHandlers.values()) { + for (DispatchHandler dispatchHandler : dispatchHandlers) { + dispatchHandler.printCloseOpLog(); + } + } + } + } + + @Override + public void setGroupKeyObject(Object keyObject) { + this.groupKeyObject = keyObject; + for (Operator op : childOperators) { + op.setGroupKeyObject(keyObject); + } + } + + @Override + public OperatorType getType() { + // TODO Auto-generated method stub + return null; + } + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return getOperatorName(); + } + + static public String getOperatorName() { + return "CDP"; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java index 283d0b6..0ade890 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java @@ -61,6 +61,7 @@ public class ExecReducer extends MapReduceBase implements Reducer { private Reporter rp; private boolean abort = false; private boolean isTagged = false; + private boolean isOperationPathTagged = false; //If operation path is tagged private long cntr = 0; private long nextCntr = 1; @@ -116,6 +117,7 @@ public class ExecReducer extends MapReduceBase implements Reducer { reducer.setParentOperators(null); // clear out any parents as reducer is the // root isTagged = gWork.getNeedsTagging(); + isOperationPathTagged = gWork.getNeedsOperationPathTagging(); try { keyTableDesc = gWork.getKeyDesc(); inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc @@ -164,8 +166,9 @@ public class ExecReducer extends MapReduceBase implements Reducer { private BytesWritable groupKey; - ArrayList row = new ArrayList(3); + ArrayList row = new ArrayList(4); ByteWritable tag = new ByteWritable(); + ByteWritable operationPathTags = new ByteWritable(); public void reduce(Object key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { @@ -188,6 +191,14 @@ public class ExecReducer extends MapReduceBase implements Reducer { keyWritable.setSize(size); } + operationPathTags.set((byte)0); + if (isOperationPathTagged) { + // remove the operation plan tag + int size = keyWritable.getSize() - 1; + operationPathTags.set(keyWritable.get()[size]); + keyWritable.setSize(size); + } + if (!keyWritable.equals(groupKey)) { // If a operator wants to do some work at the beginning of a group if (groupKey == null) { // the first group @@ -234,6 +245,7 @@ public class ExecReducer extends MapReduceBase implements Reducer { row.add(valueObject[tag.get()]); // The tag is not used any more, we should remove it. row.add(tag); + row.add(operationPathTags); if (isLogInfoEnabled) { cntr++; if (cntr == nextCntr) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java index e3ed13a..8eaa47c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java @@ -48,7 +48,7 @@ public class JoinOperator extends CommonJoinOperator implements /** * SkewkeyTableCounter. - * + * */ public static enum SkewkeyTableCounter { SKEWJOINFOLLOWUPJOBS @@ -141,7 +141,7 @@ public class JoinOperator extends CommonJoinOperator implements /** * All done. - * + * */ @Override public void closeOp(boolean abort) throws HiveException { @@ -210,6 +210,7 @@ public class JoinOperator extends CommonJoinOperator implements /** * This is a similar implementation of FileSinkOperator.moveFileToFinalPath. + * * @param specPath * @param hconf * @param success @@ -218,7 +219,7 @@ public class JoinOperator extends CommonJoinOperator implements * @throws IOException * @throws HiveException */ - private void mvFileToFinalPath(String specPath, Configuration hconf, + private void mvFileToFinalPath(String specPath, Configuration hconf, boolean success, Log log) throws IOException, HiveException { FileSystem fs = (new Path(specPath)).getFileSystem(hconf); @@ -247,7 +248,7 @@ public class JoinOperator extends CommonJoinOperator implements /** * Forward a record of join results. - * + * * @throws HiveException */ @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index f0c35e7..e010d3e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.OutputCollector; @@ -1359,4 +1360,52 @@ public abstract class Operator implements Serializable,C return ret; } + + //bytesWritableGroupKey is only used when a query plan is optimized by CorrelationOptimizer. + // CorrelationLocalSimulativeReduceSinkOperator will use this variable to determine when it needs to start or end the group + // for its child operator. + protected BytesWritable bytesWritableGroupKey; + + public void setBytesWritableGroupKey(BytesWritable groupKey) { + if (bytesWritableGroupKey == null) { + bytesWritableGroupKey = new BytesWritable(); + } + bytesWritableGroupKey.set(groupKey.get(), 0, groupKey.getSize()); + } + + public BytesWritable getBytesWritableGroupKey() { + return bytesWritableGroupKey; + } + + // The number of current row + protected long rowNumber; + + public void initializeRowNumber() { + this.rowNumber = 0L; + LOG.info("Operator " + id + " " + getName() + " row number initialized to 0"); + if (childOperators == null) { + return; + } + LOG.info("Initializing row numbers of children of " + id + " " + getName()); + for (int i = 0; i < childOperatorsArray.length; i++) { + childOperatorsArray[i].initializeRowNumber(); + } + } + + public void setRowNumber(long rowNumber) { + this.rowNumber = rowNumber; + if (childOperators == null) { + return; + } + for (int i = 0; i < childOperatorsArray.length; i++) { + assert rowNumber >= childOperatorsArray[i].getRowNumber(); + if (rowNumber != childOperatorsArray[i].getRowNumber()) { + childOperatorsArray[i].setRowNumber(rowNumber); + } + } + } + + public long getRowNumber() { + return rowNumber; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java index 0c22141..064afc9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java @@ -22,6 +22,9 @@ import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hive.ql.plan.CollectDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationLocalSimulativeReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc; import org.apache.hadoop.hive.ql.plan.ExtractDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc; @@ -91,6 +94,12 @@ public final class OperatorFactory { HashTableDummyOperator.class)); opvec.add(new OpTuple(HashTableSinkDesc.class, HashTableSinkOperator.class)); + opvec.add(new OpTuple(CorrelationCompositeDesc.class, + CorrelationCompositeOperator.class)); + opvec.add(new OpTuple(CorrelationReducerDispatchDesc.class, + CorrelationReducerDispatchOperator.class)); + opvec.add(new OpTuple(CorrelationLocalSimulativeReduceSinkDesc.class, + CorrelationLocalSimulativeReduceSinkOperator.class)); } public static Operator get(Class opClass) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index a2caeed..555e595 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -21,173 +21,43 @@ package org.apache.hadoop.hive.ql.exec; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import java.util.Random; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; -import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.Serializer; -import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; /** * Reduce Sink Operator sends output to the reduce stage. **/ -public class ReduceSinkOperator extends TerminalOperator +public class ReduceSinkOperator extends BaseReduceSinkOperator implements Serializable { private static final long serialVersionUID = 1L; - /** - * The evaluators for the key columns. Key columns decide the sort order on - * the reducer side. Key columns are passed to the reducer in the "key". - */ - protected transient ExprNodeEvaluator[] keyEval; - /** - * The evaluators for the value columns. Value columns are passed to reducer - * in the "value". - */ - protected transient ExprNodeEvaluator[] valueEval; - /** - * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in - * Hive language). Partition columns decide the reducer that the current row - * goes to. Partition columns are not passed to reducer. - */ - protected transient ExprNodeEvaluator[] partitionEval; - - // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is - // ready - transient Serializer keySerializer; - transient boolean keyIsText; - transient Serializer valueSerializer; - transient int tag; - transient byte[] tagByte = new byte[1]; - transient protected int numDistributionKeys; - transient protected int numDistinctExprs; - - @Override - protected void initializeOp(Configuration hconf) throws HiveException { - - try { - keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()]; - int i = 0; - for (ExprNodeDesc e : conf.getKeyCols()) { - keyEval[i++] = ExprNodeEvaluatorFactory.get(e); - } - - numDistributionKeys = conf.getNumDistributionKeys(); - distinctColIndices = conf.getDistinctColumnIndices(); - numDistinctExprs = distinctColIndices.size(); - - valueEval = new ExprNodeEvaluator[conf.getValueCols().size()]; - i = 0; - for (ExprNodeDesc e : conf.getValueCols()) { - valueEval[i++] = ExprNodeEvaluatorFactory.get(e); - } - - partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()]; - i = 0; - for (ExprNodeDesc e : conf.getPartitionCols()) { - partitionEval[i++] = ExprNodeEvaluatorFactory.get(e); - } - - tag = conf.getTag(); - tagByte[0] = (byte) tag; - LOG.info("Using tag = " + tag); - - TableDesc keyTableDesc = conf.getKeySerializeInfo(); - keySerializer = (Serializer) keyTableDesc.getDeserializerClass() - .newInstance(); - keySerializer.initialize(null, keyTableDesc.getProperties()); - keyIsText = keySerializer.getSerializedClass().equals(Text.class); + private final ArrayList operationPathTags = new ArrayList(); // operation path tags + private final byte[] operationPathTagsByte = new byte[1]; - TableDesc valueTableDesc = conf.getValueSerializeInfo(); - valueSerializer = (Serializer) valueTableDesc.getDeserializerClass() - .newInstance(); - valueSerializer.initialize(null, valueTableDesc.getProperties()); - - firstRow = true; - initializeChildren(hconf); - } catch (Exception e) { - e.printStackTrace(); - throw new RuntimeException(e); + public void setOperationPathTags(ArrayList operationPathTags) { + this.operationPathTags.addAll(operationPathTags); + int operationPathTagsInt = 0; + int tmp = 1; + for (Integer operationPathTag: operationPathTags) { + operationPathTagsInt += tmp << operationPathTag.intValue(); } + operationPathTagsByte[0] = (byte) operationPathTagsInt; } - transient InspectableObject tempInspectableObject = new InspectableObject(); - transient HiveKey keyWritable = new HiveKey(); - transient Writable value; - - transient StructObjectInspector keyObjectInspector; - transient StructObjectInspector valueObjectInspector; - transient ObjectInspector[] partitionObjectInspectors; - - transient Object[][] cachedKeys; - transient Object[] cachedValues; - transient List> distinctColIndices; - - boolean firstRow; - - transient Random random; - - /** - * Initializes array of ExprNodeEvaluator. Adds Union field for distinct - * column indices for group by. - * Puts the return values into a StructObjectInspector with output column - * names. - * - * If distinctColIndices is empty, the object inspector is same as - * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)} - */ - protected static StructObjectInspector initEvaluatorsAndReturnStruct( - ExprNodeEvaluator[] evals, List> distinctColIndices, - List outputColNames, - int length, ObjectInspector rowInspector) - throws HiveException { - int inspectorLen = evals.length > length ? length + 1 : evals.length; - List sois = new ArrayList(inspectorLen); - - // keys - ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, 0, length, rowInspector); - sois.addAll(Arrays.asList(fieldObjectInspectors)); - - if (evals.length > length) { - // union keys - List uois = new ArrayList(); - for (List distinctCols : distinctColIndices) { - List names = new ArrayList(); - List eois = new ArrayList(); - int numExprs = 0; - for (int i : distinctCols) { - names.add(HiveConf.getColumnInternalName(numExprs)); - eois.add(evals[i].initialize(rowInspector)); - numExprs++; - } - uois.add(ObjectInspectorFactory.getStandardStructObjectInspector(names, eois)); - } - UnionObjectInspector uoi = - ObjectInspectorFactory.getStandardUnionObjectInspector(uois); - sois.add(uoi); - } - return ObjectInspectorFactory.getStandardStructObjectInspector(outputColNames, sois ); + public ArrayList getOperationPathTags() { + return this.operationPathTags; } - + @Override public void processOp(Object row, int tag) throws HiveException { try { @@ -267,9 +137,18 @@ public class ReduceSinkOperator extends TerminalOperator keyWritable.set(key.getBytes(), 0, key.getLength()); } else { int keyLength = key.getLength(); - keyWritable.setSize(keyLength + 1); + if (!this.getConf().getNeedsOperationPathTagging()) { + keyWritable.setSize(keyLength + 1); + } else { + keyWritable.setSize(keyLength + 2); + } System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength); - keyWritable.get()[keyLength] = tagByte[0]; + if (!this.getConf().getNeedsOperationPathTagging()) { + keyWritable.get()[keyLength] = tagByte[0]; + } else { + keyWritable.get()[keyLength] = operationPathTagsByte[0]; + keyWritable.get()[keyLength + 1] = tagByte[0]; + } } } else { // Must be BytesWritable @@ -279,9 +158,18 @@ public class ReduceSinkOperator extends TerminalOperator keyWritable.set(key.getBytes(), 0, key.getLength()); } else { int keyLength = key.getLength(); - keyWritable.setSize(keyLength + 1); + if (!this.getConf().getNeedsOperationPathTagging()) { + keyWritable.setSize(keyLength + 1); + } else { + keyWritable.setSize(keyLength + 2); + } System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength); - keyWritable.get()[keyLength] = tagByte[0]; + if (!this.getConf().getNeedsOperationPathTagging()) { + keyWritable.get()[keyLength] = tagByte[0]; + } else { + keyWritable.get()[keyLength] = operationPathTagsByte[0]; + keyWritable.get()[keyLength + 1] = tagByte[0]; + } } } keyWritable.setHashCode(keyHashCode); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 1a40630..131f640 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -583,7 +583,7 @@ public class SMBMapJoinOperator extends AbstractMapJoinOperator imp } @Override - protected boolean allInitializedParentsAreClosed() { + public boolean allInitializedParentsAreClosed() { return true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index dffdd7b..1881f9e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -80,6 +80,9 @@ public class TableScanOperator extends Operator implements if (conf != null && conf.isGatherStats()) { gatherStats(row); } + if (conf != null && conf.isForwardRowNumber()) { + setRowNumber(rowNumber+1); + } forward(row, inputObjInspectors[tag]); } @@ -169,6 +172,12 @@ public class TableScanOperator extends Operator implements if (conf == null) { return; } + + LOG.info(this.getName() + " forward row number " + conf.isForwardRowNumber()); + if(conf.isForwardRowNumber()){ + initializeRowNumber(); + } + if (!conf.isGatherStats()) { return; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java new file mode 100644 index 0000000..eae531c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java @@ -0,0 +1,964 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.OpParseContext; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.QB; +import org.apache.hadoop.hive.ql.parse.QBExpr; +import org.apache.hadoop.hive.ql.parse.RowResolver; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; + +/** + * Implementation of correlation optimizer. The optimization is based on + * the paper + * "YSmart: Yet Another SQL-to-MapReduce Translator" + * (Rubao Lee, Tian Luo, Yin Huai, Fusheng Wang, Yongqiang He, and Xiaodong Zhang). + * This optimizer first detects three kinds of + * correlations, Input Correlation (IC), Transit Correlation (TC) and Job Flow Correlation (JFC), + * and then merge correlated MapReduce-jobs (MR-jobs) into one MR-job. + * Correlation correlation detection and query plan tree transformation is the last transformation + * in the {@link Optimizer}. Since opColumnExprMap, opParseCtx, opRowResolver may be changed by + * other optimizers, + * currently, correlation optimizer has two phases. The first phase is the first transformation in + * the {@link Optimizer}. In the first phase, original opColumnExprMap, opParseCtx, opRowResolver + * will be recorded. Then, the second phase (the last transformation) will perform correlation + * detection and + * query plan tree transformation. + * + *

Correlations

For the definitions of correlations, see the + * original paper. + * + *

Rules

Rules for merging correlated MR-jobs implemented in this correlation + * optimizer are:
  • If an MR-job for a Join operation has the same partitioning keys with its all + * preceding MR-jobs, correlation optimizer merges these MR-jobs into one MR-job.
  • If an + * MR-job for a GroupBy and Aggregation operation has the same partitioning keys with its preceding + * MR-job, correlation optimizer merges these two MR-jobs into one MR-job.
  • Note: In the + * current implementation, if correlation optimizer detects MR-jobs of a sub-plan tree are + * correlated, it transforms this sub-plan tree to a single MR-job when the input of this sub-plan tree is not a temporary + * table. Otherwise, the current implementation will ignore this sub-plan tree. + * + *

    Future Work

    There are several future work that will enhance the correlation + * optimizer. Here are three examples:
  • Add a new rule that is if two MR-jobs share the same + * partitioning keys and they have common input tables, merge these two MR-jobs into a single + * MR-job.
  • The current implementation detects MR-jobs which have the same partitioning keys + * as correlated MR-jobs. However, the condition of same partitioning keys can be relaxed to use + * common partitioning keys.
  • The current implementation cannot optimize MR-jobs for the + * aggregation functions with a distinct keyword, which should be supported in the future + * implementation of the correlation optimizer.
  • Optimize queries involve self-join.
  • + */ + +public class CorrelationOptimizer implements Transform { + + static final private Log LOG = LogFactory.getLog(CorrelationOptimizer.class.getName()); + private final HashMap AliastoTabName; + private final HashMap AliastoTab; + + public CorrelationOptimizer() { + super(); + AliastoTabName = new HashMap(); + AliastoTab = new HashMap(); + pGraphContext = null; + } + + private boolean initializeAliastoTabNameMapping(QB qb) { + // If any sub-query's qb is null, CorrelationOptimizer will not optimize this query. + // e.g. auto_join27.q + if (qb == null) { + return false; + } + boolean ret = true; + for (String alias : qb.getAliases()) { + AliastoTabName.put(alias, qb.getTabNameForAlias(alias)); + AliastoTab.put(alias, qb.getMetaData().getSrcForAlias(alias)); + } + for (String subqalias : qb.getSubqAliases()) { + QBExpr qbexpr = qb.getSubqForAlias(subqalias); + ret = ret && initializeAliastoTabNameMapping(qbexpr.getQB()); + } + return ret; + } + + protected ParseContext pGraphContext; + private LinkedHashMap, OpParseContext> opParseCtx; + private final LinkedHashMap, OpParseContext> originalOpParseCtx = + new LinkedHashMap, OpParseContext>(); + private final LinkedHashMap, RowResolver> originalOpRowResolver = + new LinkedHashMap, RowResolver>(); + private final LinkedHashMap, Map> originalOpColumnExprMap = + new LinkedHashMap, Map>(); + + private boolean isPhase1 = true; + private boolean hasMultipleFileSinkOperators = false; + + private Map groupbyNonMapSide2MapSide; + private Map groupbyMapSide2NonMapSide; + + /** + * Transform the query tree. Firstly, find out correlations between operations. + * Then, group these operators in groups + * + * @param pactx + * current parse context + * @throws SemanticException + */ + public ParseContext transform(ParseContext pctx) throws SemanticException { + if (isPhase1) { + pGraphContext = pctx; + opParseCtx = pctx.getOpParseCtx(); + + CorrelationNodePhase1ProcCtx phase1ProcCtx = new CorrelationNodePhase1ProcCtx(); + Map opRules = new LinkedHashMap(); + Dispatcher disp = new DefaultRuleDispatcher(getPhase1DefaultProc(), opRules, + phase1ProcCtx); + GraphWalker ogw = new DefaultGraphWalker(disp); + + // Create a list of topOp nodes + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pGraphContext.getTopOps().values()); + ogw.startWalking(topNodes, null); + isPhase1 = false; + hasMultipleFileSinkOperators = phase1ProcCtx.fileSinkOperatorCount > 1; + } else { + /* + * Types of correlations: + * 1) Input Correlation: Multiple nodes have input correlation + * (IC) if their input relation sets are not disjoint; + * 2) Transit Correlation: Multiple nodes have transit correlation + * (TC) if they have not only input correlation, but + * also the same partition key; + * 3) Job Flow Correlation: A node has job flow correlation + * (JFC) with one of its child nodes if it has the same + * partition key as that child node. + */ + + pGraphContext = pctx; + if (hasMultipleFileSinkOperators) { + //TODO: handle queries with multiple FileSinkOperators; + return pGraphContext; + } + + + opParseCtx = pctx.getOpParseCtx(); + + groupbyNonMapSide2MapSide = pctx.getGroupbyNonMapSide2MapSide(); + groupbyMapSide2NonMapSide = pctx.getGroupbyMapSide2NonMapSide(); + + QB qb = pGraphContext.getQB(); + boolean cannotHandle = !initializeAliastoTabNameMapping(qb); + if (cannotHandle) { + LOG.info("This query or its sub-queries has a null qb. " + + "Will not try to optimize it."); + return pGraphContext; + } + + // 0: Replace all map-side group by pattern (GBY-RS-GBY) to + // non-map-side group by pattern (RS-GBY) if necessary + if (pGraphContext.getConf().getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) { + for (Entry entry: + groupbyMapSide2NonMapSide.entrySet()) { + GroupByOperator mapSidePatternStart = entry.getKey(); + GroupByOperator mapSidePatternEnd = (GroupByOperator) mapSidePatternStart + .getChildOperators().get(0).getChildOperators().get(0); + ReduceSinkOperator nonMapSidePatternStart = entry.getValue(); + GroupByOperator nonMapSidePatternEnd = (GroupByOperator) nonMapSidePatternStart + .getChildOperators().get(0); + + List> parents = mapSidePatternStart.getParentOperators(); + List> children = mapSidePatternEnd.getChildOperators(); + + nonMapSidePatternStart.setParentOperators(parents); + nonMapSidePatternEnd.setChildOperators(children); + + for (Operator parent: parents) { + parent.replaceChild(mapSidePatternStart, nonMapSidePatternStart); + } + for (Operator child: children) { + child.replaceParent(mapSidePatternEnd, nonMapSidePatternEnd); + } + addOperatorInfo(nonMapSidePatternStart); + addOperatorInfo(nonMapSidePatternEnd); + } + } + + // 1: detect correlations + CorrelationNodeProcCtx correlationCtx = new CorrelationNodeProcCtx(); + + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", ReduceSinkOperator.getOperatorName() + "%"), + new CorrelationNodeProc()); + + Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, correlationCtx); + GraphWalker ogw = new DefaultGraphWalker(disp); + + // Create a list of topOp nodes + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pGraphContext.getTopOps().values()); + ogw.startWalking(topNodes, null); + + // 2: transform the query plan tree + LOG.info("Begain query plan transformation based on intra-query correlations. " + + correlationCtx.getCorrelations().size() + " correlation(s) to be applied"); + int correlationsAppliedCount = 0; + for (IntraQueryCorrelation correlation : correlationCtx.getCorrelations()) { + boolean ret = CorrelationOptimizerUtils.applyCorrelation( + correlation, pGraphContext, originalOpColumnExprMap, originalOpRowResolver, + groupbyNonMapSide2MapSide, originalOpParseCtx); + if (ret) { + correlationsAppliedCount++; + } + } + + // 3: if no correlation applied, replace all non-map-side group by pattern (GBY-RS-GBY) to + // map-side group by pattern (RS-GBY) if necessary + if (correlationsAppliedCount == 0 && + pGraphContext.getConf().getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) { + for (Entry entry: + groupbyNonMapSide2MapSide.entrySet()) { + GroupByOperator mapSidePatternStart = entry.getValue(); + GroupByOperator mapSidePatternEnd = (GroupByOperator) mapSidePatternStart + .getChildOperators().get(0).getChildOperators().get(0); + ReduceSinkOperator nonMapSidePatternStart = entry.getKey(); + GroupByOperator nonMapSidePatternEnd = (GroupByOperator) nonMapSidePatternStart + .getChildOperators().get(0); + + List> parents = nonMapSidePatternStart.getParentOperators(); + List> children = nonMapSidePatternEnd.getChildOperators(); + + mapSidePatternStart.setParentOperators(parents); + mapSidePatternEnd.setChildOperators(children); + + for (Operator parent: parents) { + parent.replaceChild(nonMapSidePatternStart, mapSidePatternStart); + } + for (Operator child: children) { + child.replaceParent(nonMapSidePatternEnd, mapSidePatternEnd); + } + } + } + LOG.info("Finish query plan transformation based on intra-query correlations. " + + correlationsAppliedCount + " correlation(s) actually be applied"); + } + return pGraphContext; + } + + private void addOperatorInfo(Operator op) { + OpParseContext opCtx = opParseCtx.get(op); + if (op.getColumnExprMap() != null) { + if (!originalOpColumnExprMap.containsKey(op)) { + originalOpColumnExprMap.put(op, op.getColumnExprMap()); + } + } + if (opCtx != null) { + if (!originalOpParseCtx.containsKey(op)) { + originalOpParseCtx.put(op, opCtx); + } + if (opCtx.getRowResolver() != null) { + if (!originalOpRowResolver.containsKey(op)) { + originalOpRowResolver.put(op, opCtx.getRowResolver()); + } + } + } + } + + private NodeProcessor getPhase1DefaultProc() { + return new NodeProcessor() { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { + Operator op = (Operator) nd; + addOperatorInfo(op); + + if (op.getName().equals(FileSinkOperator.getOperatorName())) { + ((CorrelationNodePhase1ProcCtx)procCtx).fileSinkOperatorCount++; + } + return null; + } + }; + } + + private class CorrelationNodeProc implements NodeProcessor { + + public ReduceSinkOperator findNextChildReduceSinkOperator(ReduceSinkOperator rsop) { + Operator op = rsop.getChildOperators().get(0); + while (!op.getName().equals(ReduceSinkOperator.getOperatorName())) { + if (op.getName().equals(FileSinkOperator.getOperatorName())) { + return null; + } + assert op.getChildOperators().size() <= 1; + op = op.getChildOperators().get(0); + } + return (ReduceSinkOperator) op; + } + + private HashSet findCorrelatedReduceSinkOperators( + Operator op, HashSet keyColumns, + IntraQueryCorrelation correlation) throws Exception { + + LOG.info("now detecting operator " + op.getIdentifier() + " " + op.getName()); + + HashSet correlatedReduceSinkOps = new HashSet(); + if (op.getParentOperators() == null) { + return correlatedReduceSinkOps; + } + if (originalOpColumnExprMap.get(op) == null && !(op instanceof ReduceSinkOperator)) { + for (Operator parent : op.getParentOperators()) { + correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators( + parent, keyColumns, correlation)); + } + } else if (originalOpColumnExprMap.get(op) != null && !(op instanceof ReduceSinkOperator)) { + HashSet newKeyColumns = new HashSet(); + for (String keyColumn : keyColumns) { + ExprNodeDesc col = originalOpColumnExprMap.get(op).get(keyColumn); + if (col instanceof ExprNodeColumnDesc) { + newKeyColumns.add(((ExprNodeColumnDesc) col).getColumn()); + } + } + + if (op.getName().equals(CommonJoinOperator.getOperatorName())) { + HashSet tableNeedToCheck = new HashSet(); + for (String keyColumn : keyColumns) { + for (ColumnInfo cinfo : originalOpParseCtx.get(op).getRowResolver().getColumnInfos()) { + if (keyColumn.equals(cinfo.getInternalName())) { + tableNeedToCheck.add(cinfo.getTabAlias()); + } + } + } + for (Operator parent : op.getParentOperators()) { + Set tableNames = + originalOpParseCtx.get(parent).getRowResolver().getTableNames(); + for (String tbl : tableNames) { + if (tableNeedToCheck.contains(tbl)) { + correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators(parent, + newKeyColumns, correlation)); + } + } + } + } else { + for (Operator parent : op.getParentOperators()) { + correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators( + parent, newKeyColumns, correlation)); + } + } + } else if (originalOpColumnExprMap.get(op) != null && op instanceof ReduceSinkOperator) { + HashSet newKeyColumns = new HashSet(); + for (String keyColumn : keyColumns) { + ExprNodeDesc col = originalOpColumnExprMap.get(op).get(keyColumn); + if (col instanceof ExprNodeColumnDesc) { + newKeyColumns.add(((ExprNodeColumnDesc) col).getColumn()); + } + } + + ReduceSinkOperator rsop = (ReduceSinkOperator) op; + HashSet thisKeyColumns = new HashSet(); + for (ExprNodeDesc key : rsop.getConf().getKeyCols()) { + if (key instanceof ExprNodeColumnDesc) { + thisKeyColumns.add(((ExprNodeColumnDesc) key).getColumn()); + } + } + + boolean isCorrelated = false; + Set intersection = new HashSet(newKeyColumns); + intersection.retainAll(thisKeyColumns); + // TODO: should use if intersection is empty to evaluate if two corresponding operators are + // correlated + // isCorrelated = !(intersection.isEmpty()); + isCorrelated = (intersection.size() == thisKeyColumns.size() && !intersection.isEmpty()); + + ReduceSinkOperator nextChildReduceSinkOperator = findNextChildReduceSinkOperator(rsop); + // Since we start the search from those reduceSinkOperator at bottom (near FileSinkOperator), + // we can always find a reduceSinkOperator at a lower level + assert nextChildReduceSinkOperator != null; + if (isCorrelated) { + if (nextChildReduceSinkOperator.getChildOperators().get(0).getName().equals( + CommonJoinOperator.getOperatorName())) { + if (intersection.size() != nextChildReduceSinkOperator.getConf().getKeyCols().size() || + intersection.size() != rsop.getConf().getKeyCols().size()) { + // Right now, we can only handle identical join keys. + isCorrelated = false; + } + } + } + + if (isCorrelated) { + LOG.info("Operator " + op.getIdentifier() + " " + op.getName() + " is correlated"); + LOG.info("--keys of this operator: " + thisKeyColumns.toString()); + LOG.info("--keys of child operator: " + keyColumns.toString()); + LOG.info("--keys of child operator mapped to this operator:" + newKeyColumns.toString()); + if (((Operator) (op.getChildOperators().get(0))).getName() + .equals(CommonJoinOperator.getOperatorName())) { + ArrayList peers = + CorrelationOptimizerUtils.findPeerReduceSinkOperators(rsop); + correlatedReduceSinkOps.addAll(peers); + } else { + correlatedReduceSinkOps.add(rsop); + } + // this if block is useful when we use "isCorrelated = !(intersection.isEmpty());" for + // the evaluation of isCorrelated + if (nextChildReduceSinkOperator.getChildOperators().get(0).getName().equals( + GroupByOperator.getOperatorName()) && + (intersection.size() < nextChildReduceSinkOperator.getConf().getKeyCols().size())) { + LOG.info("--found a RS-GBY pattern that needs to be replaced to GBY-RS-GBY patterns. " + + " The number of common keys is " + + intersection.size() + + ", and the number of keys of next group by operator" + + nextChildReduceSinkOperator.getConf().getKeyCols().size()); + correlation.addToRSGBYToBeReplacedByGBYRSGBY(nextChildReduceSinkOperator); + } + } else { + LOG.info("Operator " + op.getIdentifier() + " " + op.getName() + " is not correlated"); + LOG.info("--keys of this operator: " + thisKeyColumns.toString()); + LOG.info("--keys of child operator: " + keyColumns.toString()); + LOG.info("--keys of child operator mapped to this operator:" + newKeyColumns.toString()); + correlatedReduceSinkOps.clear(); + correlation.getRSGBYToBeReplacedByGBYRSGBY().clear(); + } + } else { + throw new Exception("Correlation optimizer: ReduceSinkOperator " + op.getIdentifier() + + " does not have ColumnExprMap"); + } + return correlatedReduceSinkOps; + } + + private HashSet exploitJFC(ReduceSinkOperator op, + CorrelationNodeProcCtx correlationCtx, IntraQueryCorrelation correlation) { + + correlationCtx.addWalked(op); + correlation.addToAllReduceSinkOperators(op); + + HashSet reduceSinkOperators = new HashSet(); + + boolean shouldDetect = true; + + ArrayList keys = op.getConf().getKeyCols(); + HashSet keyColumns = new HashSet(); + for (ExprNodeDesc key : keys) { + if (!(key instanceof ExprNodeColumnDesc)) { + shouldDetect = false; + } else { + keyColumns.add(((ExprNodeColumnDesc) key).getColumn()); + } + } + + if (shouldDetect) { + HashSet newReduceSinkOperators = new HashSet(); + for (Operator parent : op.getParentOperators()) { + try { + LOG.info("Operator " + op.getIdentifier() + + ": start detecting correlation from this operator"); + LOG.info("--keys of this operator: " + keyColumns.toString()); + HashSet correlatedReduceSinkOperators = + findCorrelatedReduceSinkOperators(parent, keyColumns, correlation); + if (correlatedReduceSinkOperators.size() == 0) { + newReduceSinkOperators.add(op); + } else { + for (ReduceSinkOperator rsop : correlatedReduceSinkOperators) { + + // For two ReduceSinkOperators, we say the one closer to FileSinkOperators is up and + // another one is down + + if (!correlation.getUp2downRSops().containsKey(op)) { + correlation.getUp2downRSops().put(op, new ArrayList()); + } + correlation.getUp2downRSops().get(op).add(rsop); + + if (!correlation.getDown2upRSops().containsKey(rsop)) { + correlation.getDown2upRSops().put(rsop, new ArrayList()); + } + correlation.getDown2upRSops().get(rsop).add(op); + HashSet exploited = exploitJFC(rsop, correlationCtx, + correlation); + if (exploited.size() == 0) { + newReduceSinkOperators.add(rsop); + } else { + newReduceSinkOperators.addAll(exploited); + } + } + } + + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + reduceSinkOperators.addAll(newReduceSinkOperators); + } + return reduceSinkOperators; + } + + private TableScanOperator findTableScanOPerator(Operator startPoint) { + Operator thisOp = startPoint.getParentOperators().get(0); + while (true) { + if (thisOp.getName().equals(ReduceSinkOperator.getOperatorName())) { + return null; + } else if (thisOp.getName().equals(TableScanOperator.getOperatorName())) { + return (TableScanOperator) thisOp; + } else { + if (thisOp.getParentOperators() != null) { + thisOp = thisOp.getParentOperators().get(0); + } else { + break; + } + } + } + return null; + } + + private void annotateOpPlan(IntraQueryCorrelation correlation) { + HashMap bottomReduceSink2OperationPath = + new HashMap(); + int indx = 0; + for (ReduceSinkOperator rsop : correlation.getBottomReduceSinkOperators()) { + if (!bottomReduceSink2OperationPath.containsKey(rsop)) { + bottomReduceSink2OperationPath.put(rsop, indx); + for (ReduceSinkOperator peerRSop : CorrelationOptimizerUtils + .findPeerReduceSinkOperators(rsop)) { + if (correlation.getBottomReduceSinkOperators().contains(peerRSop)) { + bottomReduceSink2OperationPath.put(peerRSop, indx); + } + } + indx++; + } + } + correlation.setBottomReduceSink2OperationPathMap(bottomReduceSink2OperationPath); + } + + public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, + Object... nodeOutputs) throws SemanticException { + LOG.info("Walk to operator " + ((Operator) nd).getIdentifier() + " " + + ((Operator) nd).getName()); + addOperatorInfo((Operator) nd); + + CorrelationNodeProcCtx correlationCtx = (CorrelationNodeProcCtx) ctx; + + ReduceSinkOperator op = (ReduceSinkOperator) nd; + + if (correlationCtx.isWalked(op)) { + return null; + } + + if (op.getConf().getKeyCols().size() == 0 || + (!op.getChildOperators().get(0).getName().equals(CommonJoinOperator.getOperatorName()) && + !op.getChildOperators().get(0).getName().equals(GroupByOperator.getOperatorName()))) { + correlationCtx.addWalked(op); + return null; + } + + // 1: find out correlation + IntraQueryCorrelation correlation = new IntraQueryCorrelation(); + ArrayList peerReduceSinkOperators = + CorrelationOptimizerUtils.findPeerReduceSinkOperators(op); + ArrayList bottomReduceSinkOperators = new ArrayList(); + for (ReduceSinkOperator rsop : peerReduceSinkOperators) { + HashSet thisBottomReduceSinkOperators = exploitJFC(rsop, + correlationCtx, correlation); + + // TODO: if we use "isCorrelated = !(intersection.isEmpty());" in the method + // findCorrelatedReduceSinkOperators + // for the evaluation of isCorrelated, uncomment the following if block. The + // reduceSinkOperator at the top level + // should take special care, since we cannot evaluate if the relationship of the set of keys + // of this operator with + // that of its next parent reduceSinkOperator. + // if (peerReduceSinkOperators.size() == 1) { + // correlation.addToRSGBYToBeReplacedByGBYRSGBY(rsop); + // } + if (thisBottomReduceSinkOperators.size() == 0) { + thisBottomReduceSinkOperators.add(rsop); + } else { + boolean isClear = false; + // bottom ReduceSinkOperators are those ReduceSinkOperators which are close to + // TableScanOperators + for (ReduceSinkOperator bottomRsop : thisBottomReduceSinkOperators) { + TableScanOperator tsop = findTableScanOPerator(bottomRsop); + if (tsop == null) { + isClear = true; // currently the optimizer can only optimize correlations involving + // source tables (input tables) + } else { + // bottom ReduceSinkOperators are those ReduceSinkOperators which are close to + // FileSinkOperators + if (!correlation.getTop2TSops().containsKey(rsop)) { + correlation.getTop2TSops().put(rsop, new ArrayList()); + } + correlation.getTop2TSops().get(rsop).add(tsop); + + if (!correlation.getBottom2TSops().containsKey(bottomRsop)) { + correlation.getBottom2TSops().put(bottomRsop, new ArrayList()); + } + correlation.getBottom2TSops().get(bottomRsop).add(tsop); + } + } + if (isClear) { + thisBottomReduceSinkOperators.clear(); + thisBottomReduceSinkOperators.add(rsop); + } + } + bottomReduceSinkOperators.addAll(thisBottomReduceSinkOperators); + } + + if (!peerReduceSinkOperators.containsAll(bottomReduceSinkOperators)) { + LOG.info("has job flow correlation"); + correlation.setJobFlowCorrelation(true); + correlation.setJFCCorrelation(peerReduceSinkOperators, bottomReduceSinkOperators); + annotateOpPlan(correlation); + } + + if (correlation.hasJobFlowCorrelation()) { + boolean hasICandTC = findICandTC(correlation); + LOG.info("has input correlation and transit correlation? " + hasICandTC); + correlation.setInputCorrelation(hasICandTC); + correlation.setTransitCorrelation(hasICandTC); + boolean hasSelfJoin = hasSelfJoin(correlation); + LOG.info("has self-join? " + hasSelfJoin); + correlation.setInvolveSelfJoin(hasSelfJoin); + // TODO: support self-join involved cases. For self-join related operation paths, after the + // correlation dispatch operator, each path should be filtered by a filter operator + if (!hasSelfJoin) { + LOG.info("correlation detected"); + correlationCtx.addCorrelation(correlation); + } else { + LOG.info("correlation discarded. The current optimizer cannot optimize it"); + } + } + correlationCtx.addWalkedAll(peerReduceSinkOperators); + return null; + } + + private boolean hasSelfJoin(IntraQueryCorrelation correlation) { + boolean hasSelfJoin = false; + for (Entry> entry : correlation + .getTable2CorrelatedRSops().entrySet()) { + for (ReduceSinkOperator rsop : entry.getValue()) { + HashSet intersection = new HashSet( + CorrelationOptimizerUtils.findPeerReduceSinkOperators(rsop)); + intersection.retainAll(entry.getValue()); + // if self-join is involved + if (intersection.size() > 1) { + hasSelfJoin = true; + return hasSelfJoin; + } + } + } + return hasSelfJoin; + } + + private boolean findICandTC(IntraQueryCorrelation correlation) { + + boolean hasICandTC = false; + HashMap> table2RSops = + new HashMap>(); + HashMap> table2TSops = + new HashMap>(); + + for (Entry> entry : correlation + .getBottom2TSops().entrySet()) { + String tbl = AliastoTabName.get(entry.getValue().get(0).getConf().getAlias()); + if (!table2RSops.containsKey(tbl) && !table2TSops.containsKey(tbl)) { + table2RSops.put(tbl, new ArrayList()); + table2TSops.put(tbl, new ArrayList()); + } + assert entry.getValue().size() == 1; + table2RSops.get(tbl).add(entry.getKey()); + table2TSops.get(tbl).add(entry.getValue().get(0)); + } + + for (Entry> entry : table2RSops.entrySet()) { + if (entry.getValue().size() > 1) { + hasICandTC = true; + break; + } + } + correlation.setICandTCCorrelation(table2RSops, table2TSops); + return hasICandTC; + } + } + + private NodeProcessor getDefaultProc() { + return new NodeProcessor() { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { + LOG.info("Walk to operator " + ((Operator) nd).getIdentifier() + " " + + ((Operator) nd).getName()); + addOperatorInfo((Operator) nd); + return null; + } + }; + } + + public class IntraQueryCorrelation { + + private final HashMap> down2upRSops = + new HashMap>(); + private final HashMap> up2downRSops = + new HashMap>(); + + private final HashMap> top2TSops = + new HashMap>(); + private final HashMap> bottom2TSops = + new HashMap>(); + + private ArrayList topReduceSinkOperators; + private ArrayList bottomReduceSinkOperators; + + private HashMap> table2CorrelatedRSops; + + private HashMap> table2CorrelatedTSops; + + private HashMap bottomReduceSink2OperationPathMap; + + private final HashMap>> dispatchConf = + new HashMap>>(); // inputTag->(Child->outputTag) + private final HashMap>> dispatchValueSelectDescConf = + new HashMap>>(); // inputTag->(Child->SelectDesc) + private final HashMap>> dispatchKeySelectDescConf = + new HashMap>>(); // inputTag->(Child->SelectDesc) + + private final HashSet allReduceSinkOperators = + new HashSet(); + + // this set contains all ReduceSink-GroupBy operator-pairs that should be be replaced by + // GroupBy-ReduceSink-GroupBy pattern. + // the type of first GroupByOperator is hash type and this one will be used to group records. + private final HashSet rSGBYToBeReplacedByGBYRSGBY = + new HashSet(); + + public void addToRSGBYToBeReplacedByGBYRSGBY(ReduceSinkOperator rsop) { + rSGBYToBeReplacedByGBYRSGBY.add(rsop); + } + + public HashSet getRSGBYToBeReplacedByGBYRSGBY() { + return rSGBYToBeReplacedByGBYRSGBY; + } + + public void addToAllReduceSinkOperators(ReduceSinkOperator rsop) { + allReduceSinkOperators.add(rsop); + } + + public HashSet getAllReduceSinkOperators() { + return allReduceSinkOperators; + } + + public HashMap>> getDispatchConf() { + return dispatchConf; + } + + public HashMap>> getDispatchValueSelectDescConf() { + return dispatchValueSelectDescConf; + } + + public HashMap>> getDispatchKeySelectDescConf() { + return dispatchKeySelectDescConf; + } + + public void addOperationPathToDispatchConf(Integer opPlan) { + if (!dispatchConf.containsKey(opPlan)) { + dispatchConf.put(opPlan, new HashMap>()); + } + } + + public HashMap> getDispatchConfForOperationPath(Integer opPlan) { + return dispatchConf.get(opPlan); + } + + public void addOperationPathToDispatchValueSelectDescConf(Integer opPlan) { + if (!dispatchValueSelectDescConf.containsKey(opPlan)) { + dispatchValueSelectDescConf.put(opPlan, new HashMap>()); + } + } + + public HashMap> getDispatchValueSelectDescConfForOperationPath( + Integer opPlan) { + return dispatchValueSelectDescConf.get(opPlan); + } + + public void addOperationPathToDispatchKeySelectDescConf(Integer opPlan) { + if (!dispatchKeySelectDescConf.containsKey(opPlan)) { + dispatchKeySelectDescConf.put(opPlan, new HashMap>()); + } + } + + public HashMap> getDispatchKeySelectDescConfForOperationPath( + Integer opPlan) { + return dispatchKeySelectDescConf.get(opPlan); + } + + private boolean inputCorrelation = false; + private boolean transitCorrelation = false; + private boolean jobFlowCorrelation = false; + + public void setBottomReduceSink2OperationPathMap( + HashMap bottomReduceSink2OperationPathMap) { + this.bottomReduceSink2OperationPathMap = bottomReduceSink2OperationPathMap; + } + + public HashMap getBottomReduceSink2OperationPathMap() { + return bottomReduceSink2OperationPathMap; + } + + public void setInputCorrelation(boolean inputCorrelation) { + this.inputCorrelation = inputCorrelation; + } + + public boolean hasInputCorrelation() { + return inputCorrelation; + } + + public void setTransitCorrelation(boolean transitCorrelation) { + this.transitCorrelation = transitCorrelation; + } + + public boolean hasTransitCorrelation() { + return transitCorrelation; + } + + public void setJobFlowCorrelation(boolean jobFlowCorrelation) { + this.jobFlowCorrelation = jobFlowCorrelation; + } + + public boolean hasJobFlowCorrelation() { + return jobFlowCorrelation; + } + + public HashMap> getTop2TSops() { + return top2TSops; + } + + public HashMap> getBottom2TSops() { + return bottom2TSops; + } + + public HashMap> getDown2upRSops() { + return down2upRSops; + } + + public HashMap> getUp2downRSops() { + return up2downRSops; + } + + public void setJFCCorrelation(ArrayList peerReduceSinkOperators, + ArrayList bottomReduceSinkOperators) { + this.topReduceSinkOperators = peerReduceSinkOperators; + this.bottomReduceSinkOperators = bottomReduceSinkOperators; + } + + + public ArrayList getTopReduceSinkOperators() { + return topReduceSinkOperators; + } + + public ArrayList getBottomReduceSinkOperators() { + return bottomReduceSinkOperators; + } + + public void setICandTCCorrelation(HashMap> table2RSops, + HashMap> table2TSops) { + this.table2CorrelatedRSops = table2RSops; + this.table2CorrelatedTSops = table2TSops; + } + + public HashMap> getTable2CorrelatedRSops() { + return table2CorrelatedRSops; + } + + public HashMap> getTable2CorrelatedTSops() { + return table2CorrelatedTSops; + } + + private boolean isInvolveSelfJoin = false; + + public boolean isInvolveSelfJoin() { + return isInvolveSelfJoin; + } + + public void setInvolveSelfJoin(boolean isInvolveSelfJoin) { + this.isInvolveSelfJoin = isInvolveSelfJoin; + } + + } + + private class CorrelationNodePhase1ProcCtx implements NodeProcessorCtx { + public int fileSinkOperatorCount = 0; + } + + private class CorrelationNodeProcCtx implements NodeProcessorCtx { + + private final HashSet walked = new HashSet(); + + private final ArrayList correlations = + new ArrayList(); + + public void addCorrelation(IntraQueryCorrelation correlation) { + correlations.add(correlation); + } + + public ArrayList getCorrelations() { + return correlations; + } + + public boolean isWalked(ReduceSinkOperator op) { + return walked.contains(op); + } + + public void addWalked(ReduceSinkOperator op) { + walked.add(op); + } + + public void addWalkedAll(Collection c) { + walked.addAll(c); + } + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java new file mode 100644 index 0000000..65bbd97 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java @@ -0,0 +1,805 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.CorrelationCompositeOperator; +import org.apache.hadoop.hive.ql.exec.CorrelationLocalSimulativeReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.CorrelationOptimizer.IntraQueryCorrelation; +import org.apache.hadoop.hive.ql.parse.OpParseContext; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.RowResolver; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; +import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationLocalSimulativeReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc; +import org.apache.hadoop.hive.ql.plan.ForwardDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; + + +public final class CorrelationOptimizerUtils { + + static final private Log LOG = LogFactory.getLog(CorrelationOptimizerUtils.class.getName()); + + public static boolean isExisted(ExprNodeDesc expr, ArrayList col_list) { + for (ExprNodeDesc thisExpr : col_list) { + if (expr.getExprString().equals(thisExpr.getExprString())) { + return true; + } + } + return false; + } + + public static String getColumnName(Map opColumnExprMap, ExprNodeDesc expr) { + for (Entry entry : opColumnExprMap.entrySet()) { + if (expr.getExprString().equals(entry.getValue().getExprString())) { + return entry.getKey(); + } + } + return null; + } + + + public static Operator unionUsedColumnsAndMakeNewSelect( + ArrayList rsops, + IntraQueryCorrelation correlation, LinkedHashMap, + Map> originalOpColumnExprMap, TableScanOperator input, + ParseContext pGraphContext, + LinkedHashMap, OpParseContext> originalOpParseCtx) { + + ArrayList columnNames = new ArrayList(); + Map colExprMap = new HashMap(); + ArrayList col_list = new ArrayList(); + RowResolver out_rwsch = new RowResolver(); + boolean isSelectAll = false; + + int pos = 0; + for (ReduceSinkOperator rsop : rsops) { + Operator curr = correlation.getBottom2TSops().get(rsop).get(0) + .getChildOperators().get(0); + while (true) { + if (curr.getName().equals(SelectOperator.getOperatorName())) { + SelectOperator selOp = (SelectOperator) curr; + if (selOp.getColumnExprMap() != null) { + for (Entry entry : selOp.getColumnExprMap().entrySet()) { + ExprNodeDesc expr = entry.getValue(); + if (!isExisted(expr, col_list) + && originalOpParseCtx.get(selOp).getRowResolver().getInvRslvMap() + .containsKey(entry.getKey())) { + col_list.add(expr); + String[] colRef = originalOpParseCtx.get(selOp).getRowResolver().getInvRslvMap() + .get(entry.getKey()); + String tabAlias = colRef[0]; + String colAlias = colRef[1]; + String outputName = entry.getKey(); + out_rwsch.put(tabAlias, colAlias, new ColumnInfo( + outputName, expr.getTypeInfo(), tabAlias, false)); + pos++; + columnNames.add(outputName); + colExprMap.put(outputName, expr); + } + } + } else { + for (ExprNodeDesc expr : selOp.getConf().getColList()) { + if (!isExisted(expr, col_list)) { + col_list.add(expr); + String[] colRef = pGraphContext.getOpParseCtx().get(selOp).getRowResolver() + .getInvRslvMap().get(expr.getCols().get(0)); + String tabAlias = colRef[0]; + String colAlias = colRef[1]; + String outputName = expr.getCols().get(0); + out_rwsch.put(tabAlias, colAlias, new ColumnInfo( + outputName, expr.getTypeInfo(), tabAlias, false)); + columnNames.add(outputName); + colExprMap.put(outputName, expr); + pos++; + } + } + } + break; + } else if (curr.getName().equals(FilterOperator.getOperatorName())) { + isSelectAll = true; + break; + } else if (curr.getName().equals(ReduceSinkOperator.getOperatorName())) { + ReduceSinkOperator thisRSop = (ReduceSinkOperator) curr; + for (ExprNodeDesc expr : thisRSop.getConf().getKeyCols()) { + if (!isExisted(expr, col_list)) { + col_list.add(expr); + assert expr.getCols().size() == 1; + String columnName = getColumnName(originalOpColumnExprMap.get(thisRSop), expr); + String[] colRef = pGraphContext.getOpParseCtx().get(thisRSop).getRowResolver() + .getInvRslvMap().get(columnName); + String tabAlias = colRef[0]; + String colAlias = colRef[1]; + String outputName = expr.getCols().get(0); + out_rwsch.put(tabAlias, colAlias, new ColumnInfo( + outputName, expr.getTypeInfo(), tabAlias, false)); + columnNames.add(outputName); + colExprMap.put(outputName, expr); + pos++; + } + } + for (ExprNodeDesc expr : thisRSop.getConf().getValueCols()) { + if (!isExisted(expr, col_list)) { + col_list.add(expr); + assert expr.getCols().size() == 1; + String columnName = getColumnName(originalOpColumnExprMap.get(thisRSop), expr); + String[] colRef = pGraphContext.getOpParseCtx().get(thisRSop).getRowResolver() + .getInvRslvMap().get(columnName); + String tabAlias = colRef[0]; + String colAlias = colRef[1]; + String outputName = expr.getCols().get(0); + out_rwsch.put(tabAlias, colAlias, new ColumnInfo( + outputName, expr.getTypeInfo(), tabAlias, false)); + columnNames.add(outputName); + colExprMap.put(outputName, expr); + pos++; + } + } + + break; + } else { + curr = curr.getChildOperators().get(0); + } + } + } + + Operator output; + if (isSelectAll) { + output = input; + } else { + output = putOpInsertMap(OperatorFactory.getAndMakeChild( + new SelectDesc(col_list, columnNames, false), new RowSchema( + out_rwsch.getColumnInfos()), input), out_rwsch, pGraphContext.getOpParseCtx()); + output.setColumnExprMap(colExprMap); + output.setChildOperators(Utilities.makeList()); + + } + + return output; + } + + + public static Operator putOpInsertMap( + Operator op, + RowResolver rr, LinkedHashMap, OpParseContext> opParseCtx) { + OpParseContext ctx = new OpParseContext(rr); + opParseCtx.put(op, ctx); + op.augmentPlan(); + return op; + } + + public static HashMap, String> getAliasIDtTopOps( + HashMap> topOps) { + HashMap, String> aliasIDtTopOps = + new HashMap, String>(); + for (Entry> entry : topOps.entrySet()) { + assert !aliasIDtTopOps.containsKey(entry.getValue()); + aliasIDtTopOps.put(entry.getValue(), entry.getKey()); + } + return aliasIDtTopOps; + } + + /** + * Find all peer ReduceSinkOperators (which have the same child operator of op) of op (op + * included). + */ + public static ArrayList findPeerReduceSinkOperators(ReduceSinkOperator op) { + ArrayList peerReduceSinkOperators = new ArrayList(); + List> children = op.getChildOperators(); + assert children.size() == 1; // A ReduceSinkOperator should have only one child + for (Operator parent : children.get(0).getParentOperators()) { + assert (parent instanceof ReduceSinkOperator); + peerReduceSinkOperators.add((ReduceSinkOperator) parent); + } + return peerReduceSinkOperators; + } + + public static ArrayList findPeerFakeReduceSinkOperators( + CorrelationLocalSimulativeReduceSinkOperator op) { + + ArrayList peerReduceSinkOperators = + new ArrayList(); + + List> children = op.getChildOperators(); + assert children.size() == 1; + + for (Operator parent : children.get(0).getParentOperators()) { + assert (parent instanceof ReduceSinkOperator); + peerReduceSinkOperators.add((CorrelationLocalSimulativeReduceSinkOperator) parent); + } + + return peerReduceSinkOperators; + } + + public static boolean applyCorrelation( + IntraQueryCorrelation correlation, + ParseContext inputpGraphContext, + LinkedHashMap, Map> originalOpColumnExprMap, + LinkedHashMap, RowResolver> originalOpRowResolver, + Map groupbyRegular2MapSide, + LinkedHashMap, OpParseContext> originalOpParseCtx) + throws SemanticException { + + ParseContext pGraphContext = inputpGraphContext; + + // 0: if necessary, replace RS-GBY to GBY-RS-GBY. In GBY-RS-GBY, the type of the first GBY is + // hash, so it can group records + LOG.info("apply correlation step 0: replace RS-GBY to GBY-RS-GBY"); + for (ReduceSinkOperator rsop : correlation.getRSGBYToBeReplacedByGBYRSGBY()) { + LOG.info("operator " + rsop.getIdentifier() + " should be replaced"); + assert !correlation.getBottomReduceSinkOperators().contains(rsop); + GroupByOperator mapSideGBY = groupbyRegular2MapSide.get(rsop); + assert (mapSideGBY.getChildOperators().get(0).getChildOperators().get(0) instanceof GroupByOperator); + ReduceSinkOperator newRsop = (ReduceSinkOperator) mapSideGBY.getChildOperators().get(0); + GroupByOperator reduceSideGBY = (GroupByOperator) newRsop.getChildOperators().get(0); + GroupByOperator oldReduceSideGBY = (GroupByOperator) rsop.getChildOperators().get(0); + List> parents = rsop.getParentOperators(); + List> children = oldReduceSideGBY.getChildOperators(); + mapSideGBY.setParentOperators(parents); + for (Operator parent : parents) { + parent.replaceChild(rsop, mapSideGBY); + } + reduceSideGBY.setChildOperators(children); + for (Operator child : children) { + child.replaceParent(oldReduceSideGBY, reduceSideGBY); + } + correlation.getAllReduceSinkOperators().remove(rsop); + correlation.getAllReduceSinkOperators().add(newRsop); + } + + + Operator curr; + + // 1: Create table scan operator + LOG.info("apply correlation step 1: create table scan operator"); + HashMap oldTSOP2newTSOP = + new HashMap(); + HashMap> oldTopOps = pGraphContext.getTopOps(); + HashMap, String> oldAliasIDtTopOps = + getAliasIDtTopOps(oldTopOps); + HashMap oldTopToTable = pGraphContext.getTopToTable(); + HashMap> addedTopOps = + new HashMap>(); + HashMap addedTopToTable = new HashMap(); + for (Entry> entry : correlation.getTable2CorrelatedTSops() + .entrySet()) { + TableScanOperator oldTSop = entry.getValue().get(0); + TableScanDesc tsDesc = new TableScanDesc(oldTSop.getConf().getAlias(), oldTSop.getConf() + .getVirtualCols()); + tsDesc.setForwardRowNumber(true); + OpParseContext opParseCtx = pGraphContext.getOpParseCtx().get(oldTSop); + Operator top = putOpInsertMap(OperatorFactory.get(tsDesc, + new RowSchema(opParseCtx.getRowResolver().getColumnInfos())), + opParseCtx.getRowResolver(), pGraphContext.getOpParseCtx()); + top.setParentOperators(null); + top.setChildOperators(Utilities.makeList()); + for (TableScanOperator tsop : entry.getValue()) { + addedTopOps.put(oldAliasIDtTopOps.get(tsop), top); + addedTopToTable.put((TableScanOperator) top, oldTopToTable.get(tsop)); + oldTSOP2newTSOP.put(tsop, (TableScanOperator) top); + } + } + + ArrayList> childrenOfDispatch = + new ArrayList>(); + for (ReduceSinkOperator rsop : correlation.getBottomReduceSinkOperators()) { + // TODO: currently, correlation optimizer can not handle the case that + // a table is directly connected to a post computation operator. e.g. + // Join + // / \ + // GBY T2 + // | + // T1 + if (!correlation.getBottomReduceSinkOperators() + .containsAll(findPeerReduceSinkOperators(rsop))) { + LOG.info("Can not handle the case that " + + "a table is directly connected to a post computation operator. Use original plan"); + return false; + } + Operator op = rsop.getChildOperators().get(0); + if (!childrenOfDispatch.contains(op)) { + LOG.info("Add :" + op.getIdentifier() + " " + op.getName() + + " to the children list of dispatch operator"); + childrenOfDispatch.add(op); + } + } + + int opTag = 0; + HashMap operationPath2CorrelationReduceSinkOps = + new HashMap(); + for (Entry> entry : correlation + .getTable2CorrelatedRSops().entrySet()) { + + // 2: Create select operator for shared operation paths + LOG.info("apply correlation step 2: create select operator for shared operation path for the table of " + + entry.getKey()); + curr = + unionUsedColumnsAndMakeNewSelect(entry.getValue(), correlation, originalOpColumnExprMap, + oldTSOP2newTSOP + .get(correlation.getBottom2TSops().get(entry.getValue().get(0)).get(0)), + pGraphContext, originalOpParseCtx); + + // 3: Create CorrelationCompositeOperator, CorrelationReduceSinkOperator + LOG.info("apply correlation step 3: create correlation composite Operator and correlation reduce sink operator for the table of " + + entry.getKey()); + curr = + createCorrelationCompositeReducesinkOperaotr( + correlation.getTable2CorrelatedTSops().get(entry.getKey()), entry.getValue(), + correlation, curr, pGraphContext, + childrenOfDispatch, entry.getKey(), originalOpColumnExprMap, opTag, + originalOpRowResolver); + + operationPath2CorrelationReduceSinkOps.put(new Integer(opTag), (ReduceSinkOperator) curr); + opTag++; + } + + + // 4: Create correlation dispatch operator for operation paths + LOG.info("apply correlation step 4: create correlation dispatch operator for operation paths"); + RowResolver outputRS = new RowResolver(); + List> correlationReduceSinkOps = + new ArrayList>(); + for (Entry entry : operationPath2CorrelationReduceSinkOps + .entrySet()) { + curr = entry.getValue(); + correlationReduceSinkOps.add(curr); + RowResolver inputRS = pGraphContext.getOpParseCtx().get(curr).getRowResolver(); + for (Entry> e1 : inputRS.getRslvMap().entrySet()) { + for (Entry e2 : e1.getValue().entrySet()) { + outputRS.put(e1.getKey(), e2.getKey(), e2.getValue()); + } + } + } + + Operator dispatchOp = putOpInsertMap(OperatorFactory.get( + new CorrelationReducerDispatchDesc(correlation.getDispatchConf(), correlation + .getDispatchKeySelectDescConf(), correlation.getDispatchValueSelectDescConf()), + new RowSchema(outputRS.getColumnInfos())), + outputRS, pGraphContext.getOpParseCtx()); + + dispatchOp.setParentOperators(correlationReduceSinkOps); + for (Operator thisOp : correlationReduceSinkOps) { + thisOp.setChildOperators(Utilities.makeList(dispatchOp)); + } + + // 5: Replace the old plan in the original plan tree with new plan + LOG.info("apply correlation step 5: Replace the old plan in the original plan tree with the new plan"); + HashSet> processed = + new HashSet>(); + for (Operator op : childrenOfDispatch) { + ArrayList> parents = + new ArrayList>(); + for (Operator oldParent : op.getParentOperators()) { + if (!correlation.getBottomReduceSinkOperators().contains(oldParent)) { + parents.add(oldParent); + } + } + parents.add(dispatchOp); + op.setParentOperators(parents); + } + dispatchOp.setChildOperators(childrenOfDispatch); + HashMap> newTopOps = + new HashMap>(); + for (Entry> entry : oldTopOps.entrySet()) { + if (addedTopOps.containsKey(entry.getKey())) { + newTopOps.put(entry.getKey(), addedTopOps.get(entry.getKey())); + } else { + newTopOps.put(entry.getKey(), entry.getValue()); + } + } + pGraphContext.setTopOps(newTopOps); + HashMap newTopToTable = new HashMap(); + for (Entry entry : oldTopToTable.entrySet()) { + if (addedTopToTable.containsKey(oldTSOP2newTSOP.get(entry.getKey()))) { + newTopToTable.put(oldTSOP2newTSOP.get(entry.getKey()), + addedTopToTable.get(oldTSOP2newTSOP.get(entry.getKey()))); + } else { + newTopToTable.put(entry.getKey(), entry.getValue()); + } + } + pGraphContext.setTopToTable(newTopToTable); + + // 6: Change every JFC related ReduceSinkOperator to a + // CorrelationLocalSimulativeReduceSinkOperator + LOG.info("apply correlation step 6: Change every JFC related reduce sink operator to a " + + "CorrelationLocalSimulativeReduceSinkOperator"); + for (ReduceSinkOperator rsop : correlation.getAllReduceSinkOperators()) { + if (!correlation.getBottomReduceSinkOperators().contains(rsop)) { + Operator childOP = rsop.getChildOperators().get(0); + Operator parentOP = rsop.getParentOperators().get(0); + Operator correlationFakeReduceSinkOperator = + putOpInsertMap( + OperatorFactory.get( + new CorrelationLocalSimulativeReduceSinkDesc(rsop.getConf()), + new RowSchema(pGraphContext.getOpParseCtx().get(rsop).getRowResolver() + .getColumnInfos())), + pGraphContext.getOpParseCtx().get(rsop).getRowResolver(), + pGraphContext.getOpParseCtx()); + correlationFakeReduceSinkOperator.setChildOperators(Utilities.makeList(childOP)); + correlationFakeReduceSinkOperator.setParentOperators(Utilities.makeList(parentOP)); + parentOP.getChildOperators().set(parentOP.getChildOperators().indexOf(rsop), + correlationFakeReduceSinkOperator); + childOP.getParentOperators().set(childOP.getParentOperators().indexOf(rsop), + correlationFakeReduceSinkOperator); + } + } + return true; + } + + public static Operator createCorrelationCompositeReducesinkOperaotr( + ArrayList tsops, + ArrayList rsops, + IntraQueryCorrelation correlation, + Operator input, + ParseContext pGraphContext, + ArrayList> childrenOfDispatch, + String tableName, + LinkedHashMap, Map> originalOpColumnExprMap, + int newTag, + LinkedHashMap, RowResolver> originalOpRowResolver) + throws SemanticException { + + // Create CorrelationCompositeOperator + RowResolver inputRR = pGraphContext.getOpParseCtx().get(input).getRowResolver(); + ArrayList> tops = + new ArrayList>(); + ArrayList> bottoms = + new ArrayList>(); + ArrayList opTags = new ArrayList(); + + for (ReduceSinkOperator rsop : rsops) { + TableScanOperator tsop = correlation.getBottom2TSops().get(rsop).get(0); + Operator curr = tsop.getChildOperators().get(0); + if (curr == rsop) { + // no filter needed, just forward + ForwardDesc forwardCtx = new ForwardDesc(); + Operator forwardOp = OperatorFactory.get(ForwardDesc.class); + forwardOp.setConf(forwardCtx); + tops.add(forwardOp); + bottoms.add(forwardOp); + opTags.add(correlation.getBottomReduceSink2OperationPathMap().get(rsop)); + } else { + // Add filter operator + FilterOperator currFilOp = null; + while (curr != rsop) { + if (curr.getName().equals("FIL")) { + FilterOperator fil = (FilterOperator) curr; + FilterDesc filterCtx = new FilterDesc(fil.getConf().getPredicate(), false); + Operator nowFilOp = OperatorFactory.get(FilterDesc.class); + nowFilOp.setConf(filterCtx); + if (currFilOp == null) { + currFilOp = (FilterOperator) nowFilOp; + tops.add(currFilOp); + } else { + nowFilOp.setParentOperators(Utilities.makeList(currFilOp)); + currFilOp.setChildOperators(Utilities.makeList(nowFilOp)); + currFilOp = (FilterOperator) nowFilOp; + } + } + curr = curr.getChildOperators().get(0); + } + if (currFilOp == null) { + ForwardDesc forwardCtx = new ForwardDesc(); + Operator forwardOp = OperatorFactory.get(ForwardDesc.class); + forwardOp.setConf(forwardCtx); + tops.add(forwardOp); + bottoms.add(forwardOp); + } else { + bottoms.add(currFilOp); + } + opTags.add(correlation.getBottomReduceSink2OperationPathMap().get(rsop)); + + } + } + + int[] opTagsArray = new int[opTags.size()]; + for (int i = 0; i < opTags.size(); i++) { + opTagsArray[i] = opTags.get(i).intValue(); + } + + for (Operator op : bottoms) { + op.setParentOperators(Utilities.makeList(input)); + } + input.setChildOperators(bottoms); + + CorrelationCompositeDesc ycoCtx = new CorrelationCompositeDesc(); + ycoCtx.setAllOperationPathTags(opTagsArray); + + Operator ycop = putOpInsertMap(OperatorFactory.get(ycoCtx, + new RowSchema(inputRR.getColumnInfos())), + inputRR, pGraphContext.getOpParseCtx()); + ycop.setParentOperators(tops); + for (Operator op : tops) { + op.setChildOperators(Utilities.makeList(ycop)); + } + + // Create CorrelationReduceSinkOperator + ArrayList partitionCols = new ArrayList(); + ArrayList keyCols = new ArrayList(); + Map colExprMap = new HashMap(); + ArrayList keyOutputColumnNames = new ArrayList(); + ReduceSinkOperator firstRsop = rsops.get(0); + + RowResolver firstRsopRS = pGraphContext.getOpParseCtx().get(firstRsop).getRowResolver(); + RowResolver orginalFirstRsopRS = originalOpRowResolver.get(firstRsop); + RowResolver outputRS = new RowResolver(); + HashMap keyCol2ExprForDispatch = + new HashMap(); + HashMap valueCol2ExprForDispatch = + new HashMap(); + + for (ExprNodeDesc expr : firstRsop.getConf().getKeyCols()) { + assert expr instanceof ExprNodeColumnDesc; + ExprNodeColumnDesc encd = (ExprNodeColumnDesc) expr; + String ouputName = getColumnName(originalOpColumnExprMap.get(firstRsop), expr); + ColumnInfo cinfo = orginalFirstRsopRS.getColumnInfos().get( + orginalFirstRsopRS.getPosition(ouputName)); + + String col = SemanticAnalyzer.getColumnInternalName(keyCols.size()); + keyOutputColumnNames.add(col); + ColumnInfo newColInfo = new ColumnInfo(col, cinfo.getType(), tableName, cinfo + .getIsVirtualCol(), cinfo.isHiddenVirtualCol()); + + colExprMap.put(newColInfo.getInternalName(), expr); + + outputRS.put(tableName, newColInfo.getInternalName(), newColInfo); + keyCols.add(expr); + + keyCol2ExprForDispatch.put(encd.getColumn(), new ExprNodeColumnDesc(cinfo.getType(), col, + tableName, + encd.getIsPartitionColOrVirtualCol())); + + } + + ArrayList valueCols = new ArrayList(); + ArrayList valueOutputColumnNames = new ArrayList(); + + correlation.addOperationPathToDispatchConf(newTag); + correlation.addOperationPathToDispatchKeySelectDescConf(newTag); + correlation.addOperationPathToDispatchValueSelectDescConf(newTag); + + + for (ReduceSinkOperator rsop : rsops) { + LOG.debug("Analyzing ReduceSinkOperator " + rsop.getIdentifier()); + RowResolver rs = pGraphContext.getOpParseCtx().get(rsop).getRowResolver(); + RowResolver orginalRS = originalOpRowResolver.get(rsop); + Integer childOpIndex = childrenOfDispatch.indexOf(rsop.getChildOperators().get(0)); + int outputTag = rsop.getConf().getTag(); + if (outputTag == -1) { + outputTag = 0; + } + if (!correlation.getDispatchConfForOperationPath(newTag).containsKey(childOpIndex)) { + correlation.getDispatchConfForOperationPath(newTag).put(childOpIndex, + new ArrayList()); + } + correlation.getDispatchConfForOperationPath(newTag).get(childOpIndex).add(outputTag); + + ArrayList thisKeyColsInDispatch = new ArrayList(); + ArrayList outputKeyNamesInDispatch = new ArrayList(); + for (ExprNodeDesc expr : rsop.getConf().getKeyCols()) { + assert expr instanceof ExprNodeColumnDesc; + ExprNodeColumnDesc encd = (ExprNodeColumnDesc) expr; + String outputName = getColumnName(originalOpColumnExprMap.get(rsop), expr); + LOG.debug("key column: " + outputName); + thisKeyColsInDispatch.add(keyCol2ExprForDispatch.get(encd.getColumn())); + String[] names = outputName.split("\\."); + String outputKeyName = ""; + switch (names.length) { + case 1: + outputKeyName = names[0]; + break; + case 2: + outputKeyName = names[1]; + break; + default: + throw (new SemanticException("found a un-sopported internal key name structure")); + } + outputKeyNamesInDispatch.add(outputKeyName); + } + + if (!correlation.getDispatchKeySelectDescConfForOperationPath(newTag).containsKey( + childOpIndex)) { + correlation.getDispatchKeySelectDescConfForOperationPath(newTag).put(childOpIndex, + new ArrayList()); + } + correlation.getDispatchKeySelectDescConfForOperationPath(newTag).get(childOpIndex). + add(new SelectDesc(thisKeyColsInDispatch, outputKeyNamesInDispatch, false)); + + ArrayList thisValueColsInDispatch = new ArrayList(); + ArrayList outputValueNamesInDispatch = new ArrayList(); + for (ExprNodeDesc expr : rsop.getConf().getValueCols()) { + + String outputName = getColumnName(originalOpColumnExprMap.get(rsop), expr); + LOG.debug("value column: " + outputName); + LOG.debug("originalOpColumnExprMap.get(rsop):" + originalOpColumnExprMap.get(rsop) + + " expr:" + expr.toString() + + " orginalRS.getColumnInfos().toString:" + orginalRS.getColumnInfos().toString() + " " + + outputName); + ColumnInfo cinfo = orginalRS.getColumnInfos().get(orginalRS.getPosition(outputName)); + if (!valueCol2ExprForDispatch.containsKey(expr.getExprString())) { + + String col = SemanticAnalyzer.getColumnInternalName(keyCols.size() + valueCols.size()); + valueOutputColumnNames.add(col); + ColumnInfo newColInfo = new ColumnInfo(col, cinfo.getType(), tableName, cinfo + .getIsVirtualCol(), cinfo.isHiddenVirtualCol()); + colExprMap.put(newColInfo.getInternalName(), expr); + outputRS.put(tableName, newColInfo.getInternalName(), newColInfo); + valueCols.add(expr); + + valueCol2ExprForDispatch.put(expr.getExprString(), new ExprNodeColumnDesc( + cinfo.getType(), col, tableName, + false)); + } + + thisValueColsInDispatch.add(valueCol2ExprForDispatch.get(expr.getExprString())); + String[] names = outputName.split("\\."); + String outputValueName = ""; + switch (names.length) { + case 1: + outputValueName = names[0]; + break; + case 2: + outputValueName = names[1]; + break; + default: + throw (new SemanticException("found a un-sopported internal value name structure")); + } + outputValueNamesInDispatch.add(outputValueName); + } + + if (!correlation.getDispatchValueSelectDescConfForOperationPath(newTag).containsKey( + childOpIndex)) { + correlation.getDispatchValueSelectDescConfForOperationPath(newTag).put(childOpIndex, + new ArrayList()); + } + correlation.getDispatchValueSelectDescConfForOperationPath(newTag).get(childOpIndex). + add(new SelectDesc(thisValueColsInDispatch, outputValueNamesInDispatch, false)); + } + + ReduceSinkOperator rsOp = null; + try { + rsOp = (ReduceSinkOperator) putOpInsertMap( + OperatorFactory.getAndMakeChild(getReduceSinkDesc(keyCols, + keyCols.size(), valueCols, new ArrayList>(), + keyOutputColumnNames, valueOutputColumnNames, true, newTag, keyCols.size(), + -1), new RowSchema(outputRS + .getColumnInfos()), ycop), outputRS, pGraphContext.getOpParseCtx()); + rsOp.setColumnExprMap(colExprMap); + ((CorrelationCompositeOperator) ycop).getConf().setCorrespondingReduceSinkOperator(rsOp); + } catch (SemanticException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + return rsOp; + } + + + /** + * Create the correlation reduce sink descriptor. + * + * @param keyCols + * The columns to be stored in the key + * @param numKeys + * number of distribution keys. Equals to group-by-key + * numbers usually. + * @param valueCols + * The columns to be stored in the value + * @param distinctColIndices + * column indices for distinct aggregates + * @param outputKeyColumnNames + * The output key columns names + * @param outputValueColumnNames + * The output value columns names + * @param tag + * The tag for this reducesink + * @param numPartitionFields + * The first numPartitionFields of keyCols will be partition columns. + * If numPartitionFields=-1, then partition randomly. + * @param numReducers + * The number of reducers, set to -1 for automatic inference based on + * input data size. + * @return The YSmartReduceSinkDesc object. + */ + public static ReduceSinkDesc getReduceSinkDesc( + ArrayList keyCols, int numKeys, + ArrayList valueCols, + List> distinctColIndices, + List outputKeyColumnNames, List outputValueColumnNames, + boolean includeKey, int tag, + int numPartitionFields, int numReducers) throws SemanticException { + ArrayList partitionCols = null; + + if (numPartitionFields >= keyCols.size()) { + partitionCols = keyCols; + } else if (numPartitionFields >= 0) { + partitionCols = new ArrayList(numPartitionFields); + for (int i = 0; i < numPartitionFields; i++) { + partitionCols.add(keyCols.get(i)); + } + } else { + // numPartitionFields = -1 means random partitioning + partitionCols = new ArrayList(1); + partitionCols.add(TypeCheckProcFactory.DefaultExprProcessor + .getFuncExprNodeDesc("rand")); + } + + StringBuilder order = new StringBuilder(); + for (int i = 0; i < keyCols.size(); i++) { + order.append("+"); + } + + TableDesc keyTable = null; + TableDesc valueTable = null; + ArrayList outputKeyCols = new ArrayList(); + ArrayList outputValCols = new ArrayList(); + if (includeKey) { + keyTable = PlanUtils.getReduceKeyTableDesc(PlanUtils.getFieldSchemasFromColumnListWithLength( + keyCols, distinctColIndices, outputKeyColumnNames, numKeys, ""), + order.toString()); + outputKeyCols.addAll(outputKeyColumnNames); + } else { + keyTable = PlanUtils.getReduceKeyTableDesc(PlanUtils.getFieldSchemasFromColumnList( + keyCols, "reducesinkkey"), order.toString()); + for (int i = 0; i < keyCols.size(); i++) { + outputKeyCols.add("reducesinkkey" + i); + } + } + valueTable = PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromColumnList( + valueCols, outputValueColumnNames, 0, "")); + outputValCols.addAll(outputValueColumnNames); + + return new ReduceSinkDesc(keyCols, numKeys, valueCols, outputKeyCols, + distinctColIndices, outputValCols, + tag, partitionCols, numReducers, keyTable, + valueTable, true); + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 6bc5fe4..a86fc58 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -85,7 +85,7 @@ public final class GenMapRedUtils { /** * Initialize the current plan by adding it to root tasks. - * + * * @param op * the reduce sink operator encountered * @param opProcCtx @@ -95,12 +95,12 @@ public final class GenMapRedUtils { throws SemanticException { Operator reducer = op.getChildOperators().get(0); Map, GenMapRedCtx> mapCurrCtx = - opProcCtx.getMapCurrCtx(); + opProcCtx.getMapCurrCtx(); GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0)); Task currTask = mapredCtx.getCurrTask(); MapredWork plan = (MapredWork) currTask.getWork(); HashMap, Task> opTaskMap = - opProcCtx.getOpTaskMap(); + opProcCtx.getOpTaskMap(); Operator currTopOp = opProcCtx.getCurrTopOp(); opTaskMap.put(reducer, currTask); @@ -112,10 +112,15 @@ public final class GenMapRedUtils { List> rootTasks = opProcCtx.getRootTasks(); if (!rootTasks.contains(currTask)) { - rootTasks.add(currTask); + rootTasks.add(currTask); } if (reducer.getClass() == JoinOperator.class) { plan.setNeedsTagging(true); + plan.setNeedsOperationPathTagging(false); + } + if (op.getConf().getNeedsOperationPathTagging()) { + plan.setNeedsTagging(true); + plan.setNeedsOperationPathTagging(true); } assert currTopOp != null; @@ -136,15 +141,15 @@ public final class GenMapRedUtils { } public static void initMapJoinPlan( - Operator op, GenMRProcContext ctx, - boolean readInputMapJoin, boolean readInputUnion, boolean setReducer, int pos) - throws SemanticException { + Operator op, GenMRProcContext ctx, + boolean readInputMapJoin, boolean readInputUnion, boolean setReducer, int pos) + throws SemanticException { initMapJoinPlan(op, ctx, readInputMapJoin, readInputUnion, setReducer, pos, false); } /** * Initialize the current plan by adding it to root tasks. - * + * * @param op * the map join operator encountered * @param opProcCtx @@ -157,15 +162,15 @@ public final class GenMapRedUtils { boolean readInputUnion, boolean setReducer, int pos, boolean createLocalPlan) throws SemanticException { Map, GenMapRedCtx> mapCurrCtx = - opProcCtx.getMapCurrCtx(); + opProcCtx.getMapCurrCtx(); assert (((pos == -1) && (readInputMapJoin)) || (pos != -1)); int parentPos = (pos == -1) ? 0 : pos; GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get( parentPos)); Task currTask = mapredCtx.getCurrTask(); MapredWork plan = (MapredWork) currTask.getWork(); - HashMap, Task> opTaskMap = - opProcCtx.getOpTaskMap(); + HashMap, Task> opTaskMap = + opProcCtx.getOpTaskMap(); Operator currTopOp = opProcCtx.getCurrTopOp(); // The mapjoin has already been encountered. Some context must be stored @@ -182,6 +187,7 @@ public final class GenMapRedUtils { opTaskMap.put(reducer, currTask); if (reducer.getClass() == JoinOperator.class) { plan.setNeedsTagging(true); + plan.setNeedsOperationPathTagging(false); } ReduceSinkDesc desc = (ReduceSinkDesc) op.getConf(); plan.setNumReduceTasks(desc.getNumReducers()); @@ -232,7 +238,8 @@ public final class GenMapRedUtils { seenOps.add(currTopOp); boolean local = (pos == desc.getPosBigTable()) ? false : true; setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx); - setupBucketMapJoinInfo(plan, (AbstractMapJoinOperator)op, createLocalPlan); + setupBucketMapJoinInfo(plan, (AbstractMapJoinOperator) op, + createLocalPlan); } opProcCtx.setCurrTask(currTask); @@ -244,12 +251,12 @@ public final class GenMapRedUtils { AbstractMapJoinOperator currMapJoinOp, boolean createLocalPlan) { if (currMapJoinOp != null) { Map>> aliasBucketFileNameMapping = - currMapJoinOp.getConf().getAliasBucketFileNameMapping(); - if(aliasBucketFileNameMapping!= null) { + currMapJoinOp.getConf().getAliasBucketFileNameMapping(); + if (aliasBucketFileNameMapping != null) { MapredLocalWork localPlan = plan.getMapLocalWork(); - if(localPlan == null) { - if(currMapJoinOp instanceof SMBMapJoinOperator) { - localPlan = ((SMBMapJoinOperator)currMapJoinOp).getConf().getLocalWork(); + if (localPlan == null) { + if (currMapJoinOp instanceof SMBMapJoinOperator) { + localPlan = ((SMBMapJoinOperator) currMapJoinOp).getConf().getLocalWork(); } if (localPlan == null && createLocalPlan) { localPlan = new MapredLocalWork( @@ -257,23 +264,24 @@ public final class GenMapRedUtils { new LinkedHashMap()); } } else { - //local plan is not null, we want to merge it into SMBMapJoinOperator's local work - if(currMapJoinOp instanceof SMBMapJoinOperator) { - MapredLocalWork smbLocalWork = ((SMBMapJoinOperator)currMapJoinOp).getConf().getLocalWork(); - if(smbLocalWork != null) { + // local plan is not null, we want to merge it into SMBMapJoinOperator's local work + if (currMapJoinOp instanceof SMBMapJoinOperator) { + MapredLocalWork smbLocalWork = + ((SMBMapJoinOperator) currMapJoinOp).getConf().getLocalWork(); + if (smbLocalWork != null) { localPlan.getAliasToFetchWork().putAll(smbLocalWork.getAliasToFetchWork()); localPlan.getAliasToWork().putAll(smbLocalWork.getAliasToWork()); } } } - if(localPlan == null) { + if (localPlan == null) { return; } - if(currMapJoinOp instanceof SMBMapJoinOperator) { + if (currMapJoinOp instanceof SMBMapJoinOperator) { plan.setMapLocalWork(null); - ((SMBMapJoinOperator)currMapJoinOp).getConf().setLocalWork(localPlan); + ((SMBMapJoinOperator) currMapJoinOp).getConf().setLocalWork(localPlan); } else { plan.setMapLocalWork(localPlan); } @@ -283,7 +291,8 @@ public final class GenMapRedUtils { bucketMJCxt.setBucketFileNameMapping(currMapJoinOp.getConf().getBigTableBucketNumMapping()); localPlan.setInputFileChangeSensitive(true); bucketMJCxt.setMapJoinBigTableAlias(currMapJoinOp.getConf().getBigTableAlias()); - bucketMJCxt.setBucketMatcherClass(org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class); + bucketMJCxt + .setBucketMatcherClass(org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class); bucketMJCxt.setBigTablePartSpecToFileMapping( currMapJoinOp.getConf().getBigTablePartSpecToFileMapping()); plan.setSmbJoin(currMapJoinOp instanceof SMBMapJoinOperator); @@ -293,7 +302,7 @@ public final class GenMapRedUtils { /** * Initialize the current union plan. - * + * * @param op * the reduce sink operator encountered * @param opProcCtx @@ -306,7 +315,7 @@ public final class GenMapRedUtils { MapredWork plan = (MapredWork) unionTask.getWork(); HashMap, Task> opTaskMap = - opProcCtx.getOpTaskMap(); + opProcCtx.getOpTaskMap(); opTaskMap.put(reducer, unionTask); plan.setReducer(reducer); @@ -316,6 +325,7 @@ public final class GenMapRedUtils { if (reducer.getClass() == JoinOperator.class) { plan.setNeedsTagging(true); + plan.setNeedsOperationPathTagging(false); } initUnionPlan(opProcCtx, unionTask, false); @@ -435,7 +445,7 @@ public final class GenMapRedUtils { /** * Merge the current task with the task for the current reducer. - * + * * @param op * operator being processed * @param oldTask @@ -486,8 +496,9 @@ public final class GenMapRedUtils { : true; } setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx); - if(op instanceof AbstractMapJoinOperator) { - setupBucketMapJoinInfo(plan, (AbstractMapJoinOperator)op, createLocalWork); + if (op instanceof AbstractMapJoinOperator) { + setupBucketMapJoinInfo(plan, (AbstractMapJoinOperator) op, + createLocalWork); } } currTopOp = null; @@ -532,7 +543,7 @@ public final class GenMapRedUtils { if ((oldTask != null) && (parTasks != null)) { for (Task parTask : parTasks) { parTask.addDependentTask(currTask); - if(opProcCtx.getRootTasks().contains(currTask)) { + if (opProcCtx.getRootTasks().contains(currTask)) { opProcCtx.getRootTasks().remove(currTask); } } @@ -543,14 +554,14 @@ public final class GenMapRedUtils { /** * Split the current plan by creating a temporary destination. - * + * * @param op * the reduce sink operator encountered * @param opProcCtx * processing context */ public static void splitPlan(ReduceSinkOperator op, GenMRProcContext opProcCtx) - throws SemanticException { + throws SemanticException { // Generate a new task ParseContext parseCtx = opProcCtx.getParseCtx(); MapredWork cplan = getMapRedWork(parseCtx); @@ -565,7 +576,7 @@ public final class GenMapRedUtils { cplan.setNumReduceTasks(new Integer(desc.getNumReducers())); HashMap, Task> opTaskMap = - opProcCtx.getOpTaskMap(); + opProcCtx.getOpTaskMap(); opTaskMap.put(reducer, redTask); Task currTask = opProcCtx.getCurrTask(); @@ -575,7 +586,7 @@ public final class GenMapRedUtils { /** * set the current task in the mapredWork. - * + * * @param alias_id * current alias * @param topOp @@ -595,7 +606,7 @@ public final class GenMapRedUtils { /** * set the current task in the mapredWork. - * + * * @param alias_id * current alias * @param topOp @@ -627,12 +638,12 @@ public final class GenMapRedUtils { if (partsList == null) { try { - partsList = parseCtx.getOpToPartList().get((TableScanOperator)topOp); + partsList = parseCtx.getOpToPartList().get(topOp); if (partsList == null) { partsList = PartitionPruner.prune(parseCtx.getTopToTable().get(topOp), - parseCtx.getOpToPartPruner().get(topOp), opProcCtx.getConf(), - alias_id, parseCtx.getPrunedPartitions()); - parseCtx.getOpToPartList().put((TableScanOperator)topOp, partsList); + parseCtx.getOpToPartPruner().get(topOp), opProcCtx.getConf(), + alias_id, parseCtx.getPrunedPartitions()); + parseCtx.getOpToPartList().put((TableScanOperator) topOp, partsList); } } catch (SemanticException e) { throw e; @@ -671,7 +682,8 @@ public final class GenMapRedUtils { long sizeNeeded = Integer.MAX_VALUE; int fileLimit = -1; if (parseCtx.getGlobalLimitCtx().isEnable()) { - long sizePerRow = HiveConf.getLongVar(parseCtx.getConf(), HiveConf.ConfVars.HIVELIMITMAXROWSIZE); + long sizePerRow = + HiveConf.getLongVar(parseCtx.getConf(), HiveConf.ConfVars.HIVELIMITMAXROWSIZE); sizeNeeded = parseCtx.getGlobalLimitCtx().getGlobalLimit() * sizePerRow; // for the optimization that reduce number of input file, we limit number // of files allowed. If more than specific number of files have to be @@ -679,7 +691,7 @@ public final class GenMapRedUtils { // inputs can cause unpredictable latency. It's not necessarily to be // cheaper. fileLimit = - HiveConf.getIntVar(parseCtx.getConf(), HiveConf.ConfVars.HIVELIMITOPTLIMITFILE); + HiveConf.getIntVar(parseCtx.getConf(), HiveConf.ConfVars.HIVELIMITOPTLIMITFILE); if (sizePerRow <= 0 || fileLimit <= 0) { LOG.info("Skip optimization to reduce input size of 'limit'"); @@ -836,7 +848,7 @@ public final class GenMapRedUtils { /** * set the current task in the mapredWork. - * + * * @param alias * current alias * @param topOp @@ -852,7 +864,7 @@ public final class GenMapRedUtils { Operator topOp, MapredWork plan, boolean local, TableDesc tt_desc) throws SemanticException { - if(path == null || alias == null) { + if (path == null || alias == null) { return; } @@ -882,7 +894,7 @@ public final class GenMapRedUtils { /** * set key and value descriptor. - * + * * @param plan * current plan * @param topOp @@ -915,7 +927,7 @@ public final class GenMapRedUtils { /** * create a new plan and return. - * + * * @return the new plan */ public static MapredWork getMapRedWork(ParseContext parseCtx) { @@ -927,15 +939,15 @@ public final class GenMapRedUtils { /** * create a new plan and return. The pan won't contain the name to split * sample information in parse context. - * + * * @return the new plan */ public static MapredWork getMapRedWorkFromConf(HiveConf conf) { MapredWork work = new MapredWork(); boolean mapperCannotSpanPartns = - conf.getBoolVar( - HiveConf.ConfVars.HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS); + conf.getBoolVar( + HiveConf.ConfVars.HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS); work.setMapperCannotSpanPartns(mapperCannotSpanPartns); work.setPathToAliases(new LinkedHashMap>()); work.setPathToPartitionInfo(new LinkedHashMap()); @@ -949,7 +961,7 @@ public final class GenMapRedUtils { /** * insert in the map for the operator to row resolver. - * + * * @param op * operator created * @param rr @@ -1016,7 +1028,7 @@ public final class GenMapRedUtils { // replace the reduce child with this operator List> childOpList = parent - .getChildOperators(); + .getChildOperators(); for (int pos = 0; pos < childOpList.size(); pos++) { if (childOpList.get(pos) == op) { childOpList.set(pos, fs_op); @@ -1025,7 +1037,7 @@ public final class GenMapRedUtils { } List> parentOpList = - new ArrayList>(); + new ArrayList>(); parentOpList.add(parent); fs_op.setParentOperators(parentOpList); @@ -1041,7 +1053,7 @@ public final class GenMapRedUtils { op.getParentOperators().set(posn, ts_op); Map, GenMapRedCtx> mapCurrCtx = - opProcCtx.getMapCurrCtx(); + opProcCtx.getMapCurrCtx(); mapCurrCtx.put(ts_op, new GenMapRedCtx(childTask, null, null)); String streamDesc = taskTmpDir; @@ -1064,6 +1076,7 @@ public final class GenMapRedUtils { // dependent on the redTask if (reducer.getClass() == JoinOperator.class) { cplan.setNeedsTagging(true); + cplan.setNeedsOperationPathTagging(false); } } @@ -1072,7 +1085,8 @@ public final class GenMapRedUtils { // This can be cleaned up as a function table in future if (op instanceof AbstractMapJoinOperator) { - AbstractMapJoinOperator mjOp = (AbstractMapJoinOperator) op; + AbstractMapJoinOperator mjOp = + (AbstractMapJoinOperator) op; opProcCtx.setCurrMapJoinOp(mjOp); GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(mjOp); if (mjCtx == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index 67d3a99..e45c4e2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -45,6 +45,16 @@ public class Optimizer { */ public void initialize(HiveConf hiveConf) { transformations = new ArrayList(); + // Add correlation optimizer for first phase query plan tree analysis. + // The first phase will record original opColumnExprMap, opParseCtx, opRowResolver, + // since these may be changed by other optimizers (e.g. entries in opColumnExprMap may be deleted). + // If hive.groupby.skewindata is on, CorrelationOptimizer will not be applied. + // TODO: Make correlation optimizer 1 phase. + CorrelationOptimizer correlationOptimizer = new CorrelationOptimizer(); + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION) && + !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW)) { + transformations.add(correlationOptimizer); + } // Add the transformation that computes the lineage information. transformations.add(new Generator()); if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCP)) { @@ -77,6 +87,12 @@ public class Optimizer { if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVELIMITOPTENABLE)) { transformations.add(new GlobalLimitOptimizer()); } + // The second phase of correlation optimizer used for correlation detection and query plan tree transformation. + // The second phase should be the last optimizer added into transformations. + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION) && + !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW)) { + transformations.add(correlationOptimizer); + } transformations.add(new SimpleFetchOptimizer()); // must be called last } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 8bacd3d..49e74ea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.hooks.LineageInfo; @@ -86,6 +87,11 @@ public class ParseContext { private Map> groupOpToInputTables; private Map prunedPartitions; + //a map from non-map-side group by pattern (RS-GBY) to map-side group by pattern (GBY-RS-GBY) + Map groupbyNonMapSide2MapSide; + //a map from map-side group by pattern (GBY-RS-GBY) to non-map-side group by pattern (RS-GBY) + Map groupbyMapSide2NonMapSide; + /** * The lineage information. */ @@ -169,7 +175,9 @@ public class ParseContext { GlobalLimitCtx globalLimitCtx, HashMap nameToSplitSample, HashSet semanticInputs, List> rootTasks, - Map opToSkewedPruner) { + Map opToSkewedPruner, + Map groupbyNonMapSide2MapSide, + Map groupbyMapSide2NonMapSide) { this.conf = conf; this.qb = qb; this.ast = ast; @@ -196,6 +204,8 @@ public class ParseContext { this.semanticInputs = semanticInputs; this.rootTasks = rootTasks; this.opToSkewedPruner = opToSkewedPruner; + this.groupbyNonMapSide2MapSide = groupbyNonMapSide2MapSide; + this.groupbyMapSide2NonMapSide = groupbyMapSide2NonMapSide; } /** @@ -538,7 +548,7 @@ public class ParseContext { } public void replaceRootTask(Task rootTask, - List> tasks) { + List> tasks) { this.rootTasks.remove(rootTask); this.rootTasks.addAll(tasks); } @@ -576,4 +586,11 @@ public class ParseContext { this.opToSkewedPruner = opToSkewedPruner; } + public Map getGroupbyNonMapSide2MapSide() { + return groupbyNonMapSide2MapSide; + } + + public Map getGroupbyMapSide2NonMapSide() { + return groupbyMapSide2NonMapSide; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 73a62ea..fef356a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -70,8 +70,8 @@ import org.apache.hadoop.hive.ql.exec.RecordReader; import org.apache.hadoop.hive.ql.exec.RecordWriter; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.RowSchema; -import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -188,7 +188,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { private List loadTableWork; private List loadFileWork; private Map joinContext; - private final HashMap topToTable; + private HashMap topToTable; private QB qb; private ASTNode ast; private int destTableId; @@ -210,11 +210,16 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { private final UnparseTranslator unparseTranslator; private final GlobalLimitCtx globalLimitCtx = new GlobalLimitCtx(); - //prefix for column names auto generated by hive + // a map from non-map-side group by pattern (RS-GBY) to map-side group by pattern (GBY-RS-GBY) + Map groupbyNonMapSide2MapSide; + // a map from map-side group by pattern (GBY-RS-GBY) to non-map-side group by pattern (RS-GBY) + Map groupbyMapSide2NonMapSide; + + // prefix for column names auto generated by hive private final String autogenColAliasPrfxLbl; private final boolean autogenColAliasPrfxIncludeFuncName; - //Max characters when auto generating the column name with func name + // Max characters when auto generating the column name with func name private static final int AUTOGEN_COLALIAS_PRFX_MAXLENGTH = 20; private static class Phase1Ctx { @@ -243,11 +248,13 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { prunedPartitions = new HashMap(); unparseTranslator = new UnparseTranslator(); autogenColAliasPrfxLbl = HiveConf.getVar(conf, - HiveConf.ConfVars.HIVE_AUTOGEN_COLUMNALIAS_PREFIX_LABEL); + HiveConf.ConfVars.HIVE_AUTOGEN_COLUMNALIAS_PREFIX_LABEL); autogenColAliasPrfxIncludeFuncName = HiveConf.getBoolVar(conf, - HiveConf.ConfVars.HIVE_AUTOGEN_COLUMNALIAS_PREFIX_INCLUDEFUNCNAME); + HiveConf.ConfVars.HIVE_AUTOGEN_COLUMNALIAS_PREFIX_INCLUDEFUNCNAME); queryProperties = new QueryProperties(); opToSkewedPruner = new HashMap(); + groupbyNonMapSide2MapSide = new HashMap(); + groupbyMapSide2NonMapSide = new HashMap(); } @Override @@ -266,6 +273,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { opParseCtx.clear(); groupOpToInputTables.clear(); prunedPartitions.clear(); + topToTable.clear(); + groupbyNonMapSide2MapSide.clear(); + groupbyMapSide2NonMapSide.clear(); } public void init(ParseContext pctx) { @@ -273,6 +283,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { opToPartList = pctx.getOpToPartList(); opToSamplePruner = pctx.getOpToSamplePruner(); topOps = pctx.getTopOps(); + topToTable = pctx.getTopToTable(); topSelOps = pctx.getTopSelOps(); opParseCtx = pctx.getOpParseCtx(); loadTableWork = pctx.getLoadTableWork(); @@ -288,6 +299,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { prunedPartitions = pctx.getPrunedPartitions(); fetchTask = pctx.getFetchTask(); setLineageInfo(pctx.getLineageInfo()); + groupbyNonMapSide2MapSide = pctx.getGroupbyNonMapSide2MapSide(); + groupbyMapSide2NonMapSide = pctx.getGroupbyMapSide2NonMapSide(); } public ParseContext getParseContext() { @@ -295,7 +308,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { topSelOps, opParseCtx, joinContext, topToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, - opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToSkewedPruner); + opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToSkewedPruner, + groupbyNonMapSide2MapSide, groupbyMapSide2NonMapSide); } @SuppressWarnings("nls") @@ -347,7 +361,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { ASTNode selectExpr, QBParseInfo qbp) { for (int i = 0; i < selectExpr.getChildCount(); ++i) { ASTNode selExpr = (ASTNode) selectExpr.getChild(i); - if ((selExpr.getToken().getType() == HiveParser.TOK_SELEXPR) && (selExpr.getChildCount() == 2)) { + if ((selExpr.getToken().getType() == HiveParser.TOK_SELEXPR) + && (selExpr.getChildCount() == 2)) { String columnAlias = unescapeIdentifier(selExpr.getChild(1).getText()); qbp.setExprToColumnAlias((ASTNode) selExpr.getChild(0), columnAlias); } @@ -357,7 +372,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { /** * DFS-scan the expressionTree to find all aggregation subtrees and put them * in aggregations. - * + * * @param expressionTree * @param aggregations * the key to the HashTable is the toStringTree() representation of @@ -420,7 +435,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { * Goes though the tabref tree and finds the alias for the table. Once found, * it records the table name-> alias association in aliasToTabs. It also makes * an association from the alias to the table AST in parse info. - * + * * @return the alias of the table */ private String processTable(QB qb, ASTNode tabref) throws SemanticException { @@ -482,15 +497,15 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // Need to change it to list of columns if (sampleCols.size() > 2) { throw new SemanticException(generateErrorMessage( - (ASTNode) tabref.getChild(0), - ErrorMsg.SAMPLE_RESTRICTION.getMsg())); + (ASTNode) tabref.getChild(0), + ErrorMsg.SAMPLE_RESTRICTION.getMsg())); } qb.getParseInfo().setTabSample( alias, new TableSample( - unescapeIdentifier(sampleClause.getChild(0).getText()), - unescapeIdentifier(sampleClause.getChild(1).getText()), - sampleCols)); + unescapeIdentifier(sampleClause.getChild(0).getText()), + unescapeIdentifier(sampleClause.getChild(1).getText()), + sampleCols)); if (unparseTranslator.isEnabled()) { for (ASTNode sampleCol : sampleCols) { unparseTranslator.addIdentifierTranslation((ASTNode) sampleCol @@ -501,7 +516,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // only CombineHiveInputFormat supports this optimize String inputFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT); if (!inputFormat.equals( - CombineHiveInputFormat.class.getName())) { + CombineHiveInputFormat.class.getName())) { throw new SemanticException(generateErrorMessage((ASTNode) tabref.getChild(1), "Percentage sampling is not supported in " + inputFormat)); } @@ -509,9 +524,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { String alias_id = getAliasId(alias, qb); String strPercentage = unescapeIdentifier(sampleClause.getChild(0).getText()); Double percent = Double.valueOf(strPercentage).doubleValue(); - if (percent < 0 || percent > 100) { + if (percent < 0 || percent > 100) { throw new SemanticException(generateErrorMessage(sampleClause, - "Sampling percentage should be between 0 and 100")); + "Sampling percentage should be between 0 and 100")); } nameToSplitSample.put(alias_id, new SplitSample( percent, conf.getIntVar(ConfVars.HIVESAMPLERANDOMNUM))); @@ -576,7 +591,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { /** * Given the AST with TOK_JOIN as the root, get all the aliases for the tables * or subqueries in the join. - * + * * @param qb * @param join * @throws SemanticException @@ -587,7 +602,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if ((numChildren != 2) && (numChildren != 3) && join.getToken().getType() != HiveParser.TOK_UNIQUEJOIN) { throw new SemanticException(generateErrorMessage(join, - "Join with multiple children")); + "Join with multiple children")); } for (int num = 0; num < numChildren; num++) { @@ -613,7 +628,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { * Given the AST with TOK_LATERAL_VIEW as the root, get the alias for the * table or subquery in the lateral view and also make a mapping from the * alias to all the lateral view AST's. - * + * * @param qb * @param lateralView * @return the alias for the table/subquery @@ -651,7 +666,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { /** * Phase 1: (including, but not limited to): - * + * * 1. Gets all the aliases for all the tables / subqueries and makes the * appropriate mapping in aliasToTabs, aliasToSubq 2. Gets the location of the * destination and names the clase "inclause" + i 3. Creates a map from a @@ -659,7 +674,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { * 4. Creates a mapping from the clause name to the select expression AST in * destToSelExpr 5. Creates a mapping from a table alias to the lateral view * AST's in aliasToLateralViews - * + * * @param ast * @param qb * @param ctx_1 @@ -699,7 +714,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { break; case HiveParser.TOK_INSERT_INTO: - String tab_name = getUnescapedName((ASTNode)ast.getChild(0). + String tab_name = getUnescapedName((ASTNode) ast.getChild(0). getChild(0)); qbp.addInsertIntoTable(tab_name); @@ -724,7 +739,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { int child_count = ast.getChildCount(); if (child_count != 1) { throw new SemanticException(generateErrorMessage(ast, - "Multiple Children " + child_count)); + "Multiple Children " + child_count)); } // Check if this is a subquery / lateral view @@ -757,10 +772,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { qbp.setDistributeByExprForClause(ctx_1.dest, ast); if (qbp.getClusterByForClause(ctx_1.dest) != null) { throw new SemanticException(generateErrorMessage(ast, - ErrorMsg.CLUSTERBY_DISTRIBUTEBY_CONFLICT.getMsg())); + ErrorMsg.CLUSTERBY_DISTRIBUTEBY_CONFLICT.getMsg())); } else if (qbp.getOrderByForClause(ctx_1.dest) != null) { throw new SemanticException(generateErrorMessage(ast, - ErrorMsg.ORDERBY_DISTRIBUTEBY_CONFLICT.getMsg())); + ErrorMsg.ORDERBY_DISTRIBUTEBY_CONFLICT.getMsg())); } break; @@ -771,10 +786,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { qbp.setSortByExprForClause(ctx_1.dest, ast); if (qbp.getClusterByForClause(ctx_1.dest) != null) { throw new SemanticException(generateErrorMessage(ast, - ErrorMsg.CLUSTERBY_SORTBY_CONFLICT.getMsg())); + ErrorMsg.CLUSTERBY_SORTBY_CONFLICT.getMsg())); } else if (qbp.getOrderByForClause(ctx_1.dest) != null) { throw new SemanticException(generateErrorMessage(ast, - ErrorMsg.ORDERBY_SORTBY_CONFLICT.getMsg())); + ErrorMsg.ORDERBY_SORTBY_CONFLICT.getMsg())); } break; @@ -786,7 +801,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { qbp.setOrderByExprForClause(ctx_1.dest, ast); if (qbp.getClusterByForClause(ctx_1.dest) != null) { throw new SemanticException(generateErrorMessage(ast, - ErrorMsg.CLUSTERBY_ORDERBY_CONFLICT.getMsg())); + ErrorMsg.CLUSTERBY_ORDERBY_CONFLICT.getMsg())); } break; @@ -799,7 +814,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } if (qbp.getSelForClause(ctx_1.dest).getToken().getType() == HiveParser.TOK_SELECTDI) { throw new SemanticException(generateErrorMessage(ast, - ErrorMsg.SELECT_DISTINCT_WITH_GROUPBY.getMsg())); + ErrorMsg.SELECT_DISTINCT_WITH_GROUPBY.getMsg())); } qbp.setGroupByExprForClause(ctx_1.dest, ast); skipRecursion = true; @@ -816,7 +831,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { case HiveParser.TOK_ANALYZE: // Case of analyze command - String table_name = getUnescapedName((ASTNode)ast.getChild(0).getChild(0)); + String table_name = getUnescapedName((ASTNode) ast.getChild(0).getChild(0)); qb.setTabAlias(table_name, table_name); qb.addAlias(table_name); qb.getParseInfo().setIsAnalyzeCommand(true); @@ -832,7 +847,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // select * from (subq1 union subq2) subqalias if (!qbp.getIsSubQ()) { throw new SemanticException(generateErrorMessage(ast, - ErrorMsg.UNION_NOTIN_SUBQ.getMsg())); + ErrorMsg.UNION_NOTIN_SUBQ.getMsg())); } case HiveParser.TOK_INSERT: @@ -873,7 +888,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } } else { throw new SemanticException(ErrorMsg.INSERT_INTO_DYNAMICPARTITION_IFNOTEXISTS - .getMsg(partition.toString())); + .getMsg(partition.toString())); } } @@ -925,7 +940,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } // Disallow INSERT INTO on bucketized tables - if(qb.getParseInfo().isInsertIntoTable(tab_name) && + if (qb.getParseInfo().isInsertIntoTable(tab_name) && tab.getNumBuckets() > 0) { throw new SemanticException(ErrorMsg.INSERT_INTO_BUCKETIZED_TABLE. getMsg("Table: " + tab_name)); @@ -947,9 +962,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (qb.getParseInfo().isAnalyzeCommand()) { throw new SemanticException(ErrorMsg.ANALYZE_VIEW.getMsg()); } - String fullViewName = tab.getDbName()+"."+tab.getTableName(); + String fullViewName = tab.getDbName() + "." + tab.getTableName(); // Prevent view cycles - if(viewsExpanded.contains(fullViewName)){ + if (viewsExpanded.contains(fullViewName)) { throw new SemanticException("Recursive view " + fullViewName + " detected (cycle: " + StringUtils.join(viewsExpanded, " -> ") + " -> " + fullViewName + ")."); @@ -964,8 +979,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (!InputFormat.class.isAssignableFrom(tab.getInputFormatClass())) { throw new SemanticException(generateErrorMessage( - qb.getParseInfo().getSrcForAlias(alias), - ErrorMsg.INVALID_INPUT_FORMAT_TYPE.getMsg())); + qb.getParseInfo().getSrcForAlias(alias), + ErrorMsg.INVALID_INPUT_FORMAT_TYPE.getMsg())); } qb.getMetaData().setSrcForAlias(alias, tab); @@ -976,8 +991,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { try { ts.partitions = db.getPartitionsByNames(ts.tableHandle, ts.partSpec); } catch (HiveException e) { - throw new SemanticException(generateErrorMessage(qb.getParseInfo().getSrcForAlias(alias), - "Cannot get partitions for " + ts.partSpec), e); + throw new SemanticException(generateErrorMessage( + qb.getParseInfo().getSrcForAlias(alias), + "Cannot get partitions for " + ts.partSpec), e); } } qb.getParseInfo().addTableSpec(alias, ts); @@ -994,7 +1010,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { QBExpr qbexpr = qb.getSubqForAlias(alias); getMetaData(qbexpr); if (wasView) { - viewsExpanded.remove(viewsExpanded.size()-1); + viewsExpanded.remove(viewsExpanded.size() - 1); } } @@ -1060,7 +1076,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { FileUtils.makeQualified(new Path(location), conf).toUri()); } catch (Exception e) { throw new SemanticException(generateErrorMessage(ast, - "Error creating temporary folder on: " + location), e); + "Error creating temporary folder on: " + location), e); } if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESTATSAUTOGATHER)) { tableSpec ts = new tableSpec(db, conf, this.ast); @@ -1081,7 +1097,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } default: throw new SemanticException(generateErrorMessage(ast, - "Unknown Token Type " + ast.getToken().getType())); + "Unknown Token Type " + ast.getToken().getType())); } } } catch (HiveException e) { @@ -1099,7 +1115,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { ASTNode viewTree; final ASTNodeOrigin viewOrigin = new ASTNodeOrigin("VIEW", tab.getTableName(), tab.getViewExpandedText(), alias, qb.getParseInfo().getSrcForAlias( - alias)); + alias)); try { String viewText = tab.getViewExpandedText(); // Reparse text, passing null for context to avoid clobbering @@ -1273,7 +1289,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { * condition involves both subtrees and is not a equality. Also, we only * support AND i.e ORs are not supported currently as their semantics are not * very clear, may lead to data explosion and there is no usecase. - * + * * @param joinTree * jointree to be populated * @param joinCond @@ -1329,7 +1345,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if ((rightCondAl1.size() != 0) || ((rightCondAl1.size() == 0) && (rightCondAl2.size() == 0))) { if (type.equals(JoinType.LEFTOUTER) || - type.equals(JoinType.FULLOUTER)) { + type.equals(JoinType.FULLOUTER)) { if (conf.getBoolVar(HiveConf.ConfVars.HIVEOUTERJOINSUPPORTSFILTERS)) { joinTree.getFilters().get(0).add(joinCond); } else { @@ -1416,7 +1432,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { for (int ci = childrenBegin; ci < joinCond.getChildCount(); ci++) { parseJoinCondPopulateAlias(joinTree, (ASTNode) joinCond.getChild(ci), leftAlias.get(ci - childrenBegin), rightAlias.get(ci - - childrenBegin), null); + - childrenBegin), null); } boolean leftAliasNull = true; @@ -1497,7 +1513,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild( new FilterDesc(genExprNodeDesc(condn, inputRR), false), new RowSchema( - inputRR.getColumnInfos()), input), inputRR); + inputRR.getColumnInfos()), input), inputRR); return output; } @@ -1512,7 +1528,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { /** * create a filter plan. The condition and the inputs are specified. - * + * * @param qb * current query block * @param condn @@ -1528,11 +1544,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { RowResolver inputRR = inputCtx.getRowResolver(); Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild( new FilterDesc(genExprNodeDesc(condn, inputRR), false), new RowSchema( - inputRR.getColumnInfos()), input), inputRR); + inputRR.getColumnInfos()), input), inputRR); if (LOG.isDebugEnabled()) { LOG.debug("Created Filter Plan for " + qb.getId() + " row schema: " - + inputRR.toString()); + + inputRR.toString()); } return output; } @@ -1597,8 +1613,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { col_list.add(expr); output.put(tmp[0], tmp[1], new ColumnInfo(getColumnInternalName(pos), colInfo.getType(), - colInfo.getTabAlias(), colInfo.getIsVirtualCol(), - colInfo.isHiddenVirtualCol())); + colInfo.getTabAlias(), colInfo.getIsVirtualCol(), + colInfo.isHiddenVirtualCol())); pos = Integer.valueOf(pos.intValue() + 1); matched++; @@ -1720,7 +1736,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { tblDesc.getProperties().setProperty(Constants.LINE_DELIM, lineDelim); if (!lineDelim.equals("\n") && !lineDelim.equals("10")) { throw new SemanticException(generateErrorMessage(rowChild, - ErrorMsg.LINES_TERMINATED_BY_NON_NEWLINE.getMsg())); + ErrorMsg.LINES_TERMINATED_BY_NON_NEWLINE.getMsg())); } break; default: @@ -1805,7 +1821,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { String intName = getColumnInternalName(i); ColumnInfo colInfo = new ColumnInfo(intName, TypeInfoUtils .getTypeInfoFromTypeString(getTypeStringFromAST((ASTNode) child - .getChild(1))), null, false); + .getChild(1))), null, false); colInfo.setAlias(colAlias); outputCols.add(colInfo); } @@ -1898,8 +1914,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild( new ScriptDesc( - fetchFilesNotInLocalFilesystem(stripQuotes(trfm.getChild(execPos).getText())), - inInfo, inRecordWriter, outInfo, outRecordReader, errRecordReader, errInfo), + fetchFilesNotInLocalFilesystem(stripQuotes(trfm.getChild(execPos).getText())), + inInfo, inRecordWriter, outInfo, outRecordReader, errRecordReader, errInfo), new RowSchema(out_rwsch.getColumnInfos()), input), out_rwsch); return output; @@ -2007,7 +2023,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { ASTNode root = (ASTNode) selExpr.getChild(0); if (root.getType() == HiveParser.TOK_TABLE_OR_COL) { colAlias = - BaseSemanticAnalyzer.unescapeIdentifier(root.getChild(0).getText()); + BaseSemanticAnalyzer.unescapeIdentifier(root.getChild(0).getText()); colRef[0] = tabAlias; colRef[1] = colAlias; return colRef; @@ -2029,23 +2045,23 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } } - //if specified generate alias using func name + // if specified generate alias using func name if (includeFuncName && (root.getType() == HiveParser.TOK_FUNCTION)) { String expr_flattened = root.toStringTree(); - //remove all TOK tokens + // remove all TOK tokens String expr_no_tok = expr_flattened.replaceAll("TOK_\\S+", ""); - //remove all non alphanumeric letters, replace whitespace spans with underscore - String expr_formatted = expr_no_tok.replaceAll("\\W", " ").trim().replaceAll("\\s+", "_"); + // remove all non alphanumeric letters, replace whitespace spans with underscore + String expr_formatted = expr_no_tok.replaceAll("\\W", " ").trim().replaceAll("\\s+", "_"); - //limit length to 20 chars - if(expr_formatted.length()>AUTOGEN_COLALIAS_PRFX_MAXLENGTH) { + // limit length to 20 chars + if (expr_formatted.length() > AUTOGEN_COLALIAS_PRFX_MAXLENGTH) { expr_formatted = expr_formatted.substring(0, AUTOGEN_COLALIAS_PRFX_MAXLENGTH); } - //append colnum to make it unique + // append colnum to make it unique colAlias = expr_formatted.concat("_" + colNum); } @@ -2109,7 +2125,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } boolean isInTransform = (selExprList.getChild(posn).getChild(0).getType() == - HiveParser.TOK_TRANSFORM); + HiveParser.TOK_TRANSFORM); if (isInTransform) { queryProperties.setUsesScript(true); globalLimitCtx.setHasTransformOrUDTF(true); @@ -2149,14 +2165,14 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // Only support a single expression when it's a UDTF if (selExprList.getChildCount() > 1) { throw new SemanticException(generateErrorMessage( - (ASTNode) selExprList.getChild(1), - ErrorMsg.UDTF_MULTIPLE_EXPR.getMsg())); + (ASTNode) selExprList.getChild(1), + ErrorMsg.UDTF_MULTIPLE_EXPR.getMsg())); } // Require an AS for UDTFs for column aliases ASTNode selExpr = (ASTNode) selExprList.getChild(posn); if (selExpr.getChildCount() < 2) { throw new SemanticException(generateErrorMessage(udtfExpr, - ErrorMsg.UDTF_REQUIRE_AS.getMsg())); + ErrorMsg.UDTF_REQUIRE_AS.getMsg())); } // Get the column / table aliases from the expression. Start from 1 as // 0 is the TOK_FUNCTION @@ -2217,8 +2233,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // AST's are slightly different. if (!isInTransform && !isUDTF && child.getChildCount() > 2) { throw new SemanticException(generateErrorMessage( - (ASTNode) child.getChild(2), - ErrorMsg.INVALID_AS.getMsg())); + (ASTNode) child.getChild(2), + ErrorMsg.INVALID_AS.getMsg())); } // The real expression @@ -2234,7 +2250,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // Get rid of TOK_SELEXPR expr = (ASTNode) child.getChild(0); String[] colRef = getColAlias(child, autogenColAliasPrfxLbl, inputRR, - autogenColAliasPrfxIncludeFuncName, i); + autogenColAliasPrfxIncludeFuncName, i); tabAlias = colRef[0]; colAlias = colRef[1]; if (hasAsClause) { @@ -2246,7 +2262,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (expr.getType() == HiveParser.TOK_ALLCOLREF) { pos = genColListRegex(".*", expr.getChildCount() == 0 ? null - : getUnescapedName((ASTNode)expr.getChild(0)).toLowerCase(), + : getUnescapedName((ASTNode) expr.getChild(0)).toLowerCase(), expr, col_list, inputRR, pos, out_rwsch, qb.getAliases()); selectStar = true; } else if (expr.getType() == HiveParser.TOK_TABLE_OR_COL && !hasAsClause @@ -2260,7 +2276,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } else if (expr.getType() == HiveParser.DOT && expr.getChild(0).getType() == HiveParser.TOK_TABLE_OR_COL && inputRR.hasTableAlias(unescapeIdentifier(expr.getChild(0) - .getChild(0).getText().toLowerCase())) && !hasAsClause + .getChild(0).getText().toLowerCase())) && !hasAsClause && !inputRR.getIsExprResolver() && isRegex(unescapeIdentifier(expr.getChild(1).getText()))) { // In case the expression is TABLE.COL (col can be regex). @@ -2268,7 +2284,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // We don't allow this for ExprResolver - the Group By case pos = genColListRegex(unescapeIdentifier(expr.getChild(1).getText()), unescapeIdentifier(expr.getChild(0).getChild(0).getText() - .toLowerCase()), expr, col_list, inputRR, pos, out_rwsch, + .toLowerCase()), expr, col_list, inputRR, pos, out_rwsch, qb.getAliases()); } else { // Case when this is an expression @@ -2283,9 +2299,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } ColumnInfo colInfo = new ColumnInfo(getColumnInternalName(pos), - exp.getWritableObjectInspector(), tabAlias, false); + exp.getWritableObjectInspector(), tabAlias, false); colInfo.setSkewedCol((exp instanceof ExprNodeColumnDesc) ? ((ExprNodeColumnDesc) exp) - .isSkewedCol() : false); + .isSkewedCol() : false); out_rwsch.put(tabAlias, colAlias, colInfo); pos = Integer.valueOf(pos.intValue() + 1); @@ -2308,7 +2324,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild( new SelectDesc(col_list, columnNames, selectStar), new RowSchema( - out_rwsch.getColumnInfos()), input), out_rwsch); + out_rwsch.getColumnInfos()), input), out_rwsch); output.setColumnExprMap(colExprMap); if (isInTransform) { @@ -2377,7 +2393,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { boolean isDistinct, boolean isAllColumns) throws SemanticException { ArrayList originalParameterTypeInfos = - getWritableObjectInspector(aggParameters); + getWritableObjectInspector(aggParameters); GenericUDAFEvaluator result = FunctionRegistry.getGenericUDAFEvaluator( aggName, originalParameterTypeInfos, isDistinct, isAllColumns); if (null == result) { @@ -2391,7 +2407,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { /** * Returns the GenericUDAFInfo struct for the aggregation. - * + * * @param aggName * The name of the UDAF. * @param aggParameters @@ -2458,7 +2474,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { /** * Generate the GroupByOperator for the Query Block (parseInfo.getXXX(dest)). * The new GroupByOperator will be a child of the reduceSinkOperatorInfo. - * + * * @param mode * The mode of the aggregation (PARTIAL1 or COMPLETE) * @param genericUDAFEvaluators @@ -2508,7 +2524,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { List inputKeyCols = ((ReduceSinkDesc) reduceSinkOperatorInfo.getConf()).getOutputKeyColumnNames(); if (inputKeyCols.size() > 0) { - lastKeyColName = inputKeyCols.get(inputKeyCols.size()-1); + lastKeyColName = inputKeyCols.get(inputKeyCols.size() - 1); } } int numDistinctUDFs = 0; @@ -2526,7 +2542,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { for (int i = 1; i < value.getChildCount(); i++) { ASTNode paraExpr = (ASTNode) value.getChild(i); ColumnInfo paraExprInfo = - groupByInputRowResolver.getExpression(paraExpr); + groupByInputRowResolver.getExpression(paraExpr); if (paraExprInfo == null) { throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(paraExpr)); } @@ -2537,8 +2553,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // if aggr is distinct, the parameter is name is constructed as // KEY.lastKeyColName:._colx paraExpression = Utilities.ReduceField.KEY.name() + "." + - lastKeyColName + ":" + numDistinctUDFs + "." + - getColumnInternalName(i-1); + lastKeyColName + ":" + numDistinctUDFs + "." + + getColumnInternalName(i - 1); } aggParameters.add(new ExprNodeColumnDesc(paraExprInfo.getType(), @@ -2570,11 +2586,14 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } } float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); - float memoryThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); - Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( - new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false,groupByMemoryUsage,memoryThreshold), new RowSchema(groupByOutputRowResolver.getColumnInfos()), - reduceSinkOperatorInfo), groupByOutputRowResolver); + float memoryThreshold = + HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + Operator op = + putOpInsertMap(OperatorFactory.getAndMakeChild( + new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, + false, groupByMemoryUsage, memoryThreshold), + new RowSchema(groupByOutputRowResolver.getColumnInfos()), + reduceSinkOperatorInfo), groupByOutputRowResolver); op.setColumnExprMap(colExprMap); return op; } @@ -2582,7 +2601,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { /** * Generate the GroupByOperator for the Query Block (parseInfo.getXXX(dest)). * The new GroupByOperator will be a child of the reduceSinkOperatorInfo. - * + * * @param mode * The mode of the aggregation (MERGEPARTIAL, PARTIAL2) * @param genericUDAFEvaluators @@ -2633,7 +2652,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { List inputKeyCols = ((ReduceSinkDesc) reduceSinkOperatorInfo.getConf()).getOutputKeyColumnNames(); if (inputKeyCols.size() > 0) { - lastKeyColName = inputKeyCols.get(inputKeyCols.size()-1); + lastKeyColName = inputKeyCols.get(inputKeyCols.size() - 1); } } int numDistinctUDFs = 0; @@ -2659,7 +2678,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { for (int i = 1; i < value.getChildCount(); i++) { ASTNode paraExpr = (ASTNode) value.getChild(i); ColumnInfo paraExprInfo = - groupByInputRowResolver.getExpression(paraExpr); + groupByInputRowResolver.getExpression(paraExpr); if (paraExprInfo == null) { throw new SemanticException(ErrorMsg.INVALID_COLUMN .getMsg(paraExpr)); @@ -2671,8 +2690,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // if aggr is distinct, the parameter is name is constructed as // KEY.lastKeyColName:._colx paraExpression = Utilities.ReduceField.KEY.name() + "." + - lastKeyColName + ":" + numDistinctUDFs + "." - + getColumnInternalName(i-1); + lastKeyColName + ":" + numDistinctUDFs + "." + + getColumnInternalName(i - 1); } aggParameters.add(new ExprNodeColumnDesc(paraExprInfo.getType(), @@ -2688,7 +2707,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { assert (paraExpression != null); aggParameters.add(new ExprNodeColumnDesc(paraExprInfo.getType(), paraExpression, paraExprInfo.getTabAlias(), paraExprInfo - .getIsVirtualCol())); + .getIsVirtualCol())); } if (isDistinct) { numDistinctUDFs++; @@ -2719,12 +2738,15 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { field, udaf.returnType, "", false)); } float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); - float memoryThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); - Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( - new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - distPartAgg,groupByMemoryUsage,memoryThreshold), new RowSchema(groupByOutputRowResolver - .getColumnInfos()), reduceSinkOperatorInfo), - groupByOutputRowResolver); + float memoryThreshold = + HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + Operator op = + putOpInsertMap(OperatorFactory.getAndMakeChild( + new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, + distPartAgg, groupByMemoryUsage, memoryThreshold), new RowSchema( + groupByOutputRowResolver + .getColumnInfos()), reduceSinkOperatorInfo), + groupByOutputRowResolver); op.setColumnExprMap(colExprMap); return op; } @@ -2733,7 +2755,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { * Generate the map-side GroupByOperator for the Query Block * (qb.getParseInfo().getXXX(dest)). The new GroupByOperator will be a child * of the inputOperatorInfo. - * + * * @param mode * The mode of the aggregation (HASH) * @param genericUDAFEvaluators @@ -2775,7 +2797,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (!parseInfo.getDistinctFuncExprsForClause(dest).isEmpty()) { List list = parseInfo.getDistinctFuncExprsForClause(dest); int numDistn = 0; - for(ASTNode value: list) { + for (ASTNode value : list) { // 0 is function name for (int i = 1; i < value.getChildCount(); i++) { ASTNode parameter = (ASTNode) value.getChild(i); @@ -2838,11 +2860,14 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } } float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); - float memoryThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); - Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( - new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false,groupByMemoryUsage,memoryThreshold), new RowSchema(groupByOutputRowResolver.getColumnInfos()), - inputOperatorInfo), groupByOutputRowResolver); + float memoryThreshold = + HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + Operator op = + putOpInsertMap(OperatorFactory.getAndMakeChild( + new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, + false, groupByMemoryUsage, memoryThreshold), + new RowSchema(groupByOutputRowResolver.getColumnInfos()), + inputOperatorInfo), groupByOutputRowResolver); op.setColumnExprMap(colExprMap); return op; } @@ -2851,10 +2876,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { * Generate the ReduceSinkOperator for the Group By Query Block * (qb.getPartInfo().getXXX(dest)). The new ReduceSinkOperator will be a child * of inputOperatorInfo. - * + * * It will put all Group By keys and the distinct field (if any) in the * map-reduce sort key, and all other fields in the map-reduce value. - * + * * @param numPartitionFields * the number of fields for map-reduce partitioning. This is usually * the number of fields in the Group By keys. @@ -2882,8 +2907,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames, colExprMap); - List> distinctColIndices = getDistinctColIndicesForReduceSink(parseInfo, dest, - reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames); + List> distinctColIndices = + getDistinctColIndicesForReduceSink(parseInfo, dest, + reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, + outputKeyColumnNames, colExprMap); ArrayList reduceValues = new ArrayList(); HashMap aggregationTrees = parseInfo @@ -2891,7 +2918,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (!mapAggrDone) { getReduceValuesForReduceSinkNoMapAgg(parseInfo, dest, reduceSinkInputRowResolver, - reduceSinkOutputRowResolver, outputValueColumnNames, reduceValues); + reduceSinkOutputRowResolver, outputValueColumnNames, reduceValues, colExprMap); } else { // Put partial aggregation results in reduceValues int inputField = reduceKeys.size(); @@ -2900,23 +2927,25 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { TypeInfo type = reduceSinkInputRowResolver.getColumnInfos().get( inputField).getType(); - reduceValues.add(new ExprNodeColumnDesc(type, - getColumnInternalName(inputField), "", false)); + ExprNodeDesc expr = new ExprNodeColumnDesc(type, + getColumnInternalName(inputField), "", false); + reduceValues.add(expr); inputField++; outputValueColumnNames.add(getColumnInternalName(reduceValues.size() - 1)); String field = Utilities.ReduceField.VALUE.toString() + "." + getColumnInternalName(reduceValues.size() - 1); - reduceSinkOutputRowResolver.putExpression(entry.getValue(), - new ColumnInfo(field, type, null, false)); + ColumnInfo colInfo = new ColumnInfo(field, type, null, false); + reduceSinkOutputRowResolver.putExpression(entry.getValue(), colInfo); + colExprMap.put(colInfo.getInternalName(), expr); } } ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap( OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys, - grpByExprs.size(), reduceValues, distinctColIndices, - outputKeyColumnNames, outputValueColumnNames, true, -1, numPartitionFields, - numReducers), new RowSchema(reduceSinkOutputRowResolver - .getColumnInfos()), inputOperatorInfo), reduceSinkOutputRowResolver); + grpByExprs.size(), reduceValues, distinctColIndices, + outputKeyColumnNames, outputValueColumnNames, true, -1, numPartitionFields, + numReducers), new RowSchema(reduceSinkOutputRowResolver + .getColumnInfos()), inputOperatorInfo), reduceSinkOutputRowResolver); rsOp.setColumnExprMap(colExprMap); return rsOp; } @@ -2950,9 +2979,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { return reduceKeys; } - private List> getDistinctColIndicesForReduceSink(QBParseInfo parseInfo, String dest, + private List> getDistinctColIndicesForReduceSink(QBParseInfo parseInfo, + String dest, ArrayList reduceKeys, RowResolver reduceSinkInputRowResolver, - RowResolver reduceSinkOutputRowResolver, List outputKeyColumnNames) + RowResolver reduceSinkOutputRowResolver, List outputKeyColumnNames, + Map colExprMap) throws SemanticException { List> distinctColIndices = new ArrayList>(); @@ -2986,11 +3017,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { distinctIndices.add(ri); String name = getColumnInternalName(numExprs); String field = Utilities.ReduceField.KEY.toString() + "." + colName - + ":" + i - + "." + name; + + ":" + i + + "." + name; ColumnInfo colInfo = new ColumnInfo(field, expr.getTypeInfo(), null, false); reduceSinkOutputRowResolver.putExpression(parameter, colInfo); numExprs++; + colExprMap.put(colInfo.getInternalName(), expr); } distinctColIndices.add(distinctIndices); } @@ -3001,7 +3033,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { private void getReduceValuesForReduceSinkNoMapAgg(QBParseInfo parseInfo, String dest, RowResolver reduceSinkInputRowResolver, RowResolver reduceSinkOutputRowResolver, - List outputValueColumnNames, ArrayList reduceValues) + List outputValueColumnNames, ArrayList reduceValues, + Map colExprMap) throws SemanticException { HashMap aggregationTrees = parseInfo .getAggregationExprsForClause(dest); @@ -3013,15 +3046,16 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { for (int i = 1; i < value.getChildCount(); i++) { ASTNode parameter = (ASTNode) value.getChild(i); if (reduceSinkOutputRowResolver.getExpression(parameter) == null) { - reduceValues.add(genExprNodeDesc(parameter, - reduceSinkInputRowResolver)); + ExprNodeDesc expr = genExprNodeDesc(parameter, reduceSinkInputRowResolver); + reduceValues.add(expr); outputValueColumnNames .add(getColumnInternalName(reduceValues.size() - 1)); String field = Utilities.ReduceField.VALUE.toString() + "." + getColumnInternalName(reduceValues.size() - 1); - reduceSinkOutputRowResolver.putExpression(parameter, new ColumnInfo(field, - reduceValues.get(reduceValues.size() - 1).getTypeInfo(), null, - false)); + ColumnInfo colInfo = new ColumnInfo(field, + reduceValues.get(reduceValues.size() - 1).getTypeInfo(), null, false); + reduceSinkOutputRowResolver.putExpression(parameter, colInfo); + colExprMap.put(colInfo.getInternalName(), expr); } } } @@ -3051,8 +3085,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames, colExprMap); - List> distinctColIndices = getDistinctColIndicesForReduceSink(parseInfo, dest, - reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames); + List> distinctColIndices = + getDistinctColIndicesForReduceSink(parseInfo, dest, + reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, + outputKeyColumnNames, colExprMap); ArrayList reduceValues = new ArrayList(); @@ -3061,7 +3097,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { for (String destination : dests) { getReduceValuesForReduceSinkNoMapAgg(parseInfo, destination, reduceSinkInputRowResolver, - reduceSinkOutputRowResolver, outputValueColumnNames, reduceValues); + reduceSinkOutputRowResolver, outputValueColumnNames, reduceValues, colExprMap); // Need to pass all of the columns used in the where clauses as reduce values ASTNode whereClause = parseInfo.getWhrForClause(destination); @@ -3071,15 +3107,18 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { for (int i = 0; i < columnExprs.size(); i++) { ASTNode parameter = columnExprs.get(i); if (reduceSinkOutputRowResolver.getExpression(parameter) == null) { - reduceValues.add(genExprNodeDesc(parameter, - reduceSinkInputRowResolver)); + ExprNodeDesc expr = genExprNodeDesc(parameter, + reduceSinkInputRowResolver); + reduceValues.add(expr); outputValueColumnNames .add(getColumnInternalName(reduceValues.size() - 1)); String field = Utilities.ReduceField.VALUE.toString() + "." + getColumnInternalName(reduceValues.size() - 1); - reduceSinkOutputRowResolver.putExpression(parameter, new ColumnInfo(field, + ColumnInfo colInfo = new ColumnInfo(field, reduceValues.get(reduceValues.size() - 1).getTypeInfo(), null, - false)); + false); + reduceSinkOutputRowResolver.putExpression(parameter, colInfo); + colExprMap.put(colInfo.getInternalName(), expr); } } } @@ -3087,17 +3126,17 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap( OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys, - grpByExprs.size(), reduceValues, distinctColIndices, - outputKeyColumnNames, outputValueColumnNames, true, -1, grpByExprs.size(), - -1), new RowSchema(reduceSinkOutputRowResolver - .getColumnInfos()), inputOperatorInfo), reduceSinkOutputRowResolver); + grpByExprs.size(), reduceValues, distinctColIndices, + outputKeyColumnNames, outputValueColumnNames, true, -1, grpByExprs.size(), + -1), new RowSchema(reduceSinkOutputRowResolver + .getColumnInfos()), inputOperatorInfo), reduceSinkOutputRowResolver); rsOp.setColumnExprMap(colExprMap); return rsOp; } /** * Given an ASTNode, it returns all of the descendant ASTNodes which represent column expressions - * + * * @param node * @param inputRR * @return @@ -3111,10 +3150,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { return nodes; } for (int i = 0; i < node.getChildCount(); i++) { - ASTNode child = (ASTNode)node.getChild(i); + ASTNode child = (ASTNode) node.getChild(i); if (child.getType() == HiveParser.TOK_TABLE_OR_COL && child.getChild(0) != null && inputRR.get(null, - BaseSemanticAnalyzer.unescapeIdentifier(child.getChild(0).getText())) != null) { + BaseSemanticAnalyzer.unescapeIdentifier(child.getChild(0).getText())) != null) { nodes.add(child); } else { nodes.addAll(getColumnExprsFromASTNode(child, inputRR)); @@ -3127,10 +3166,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { * Generate the second ReduceSinkOperator for the Group By Plan * (parseInfo.getXXX(dest)). The new ReduceSinkOperator will be a child of * groupByOperatorInfo. - * + * * The second ReduceSinkOperator will put the group by keys in the map-reduce * sort key, and put the partial aggregation results in the map-reduce value. - * + * * @param numPartitionFields * the number of fields in the map-reduce partition key. This should * always be the same as the number of Group By keys. We should be @@ -3176,20 +3215,23 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { ASTNode t = entry.getValue(); TypeInfo typeInfo = reduceSinkInputRowResolver2.getExpression(t) .getType(); - reduceValues.add(new ExprNodeColumnDesc(typeInfo, field, "", false)); + ExprNodeColumnDesc inputExpr = new ExprNodeColumnDesc(typeInfo, field, + "", false); + reduceValues.add(inputExpr); inputField++; String col = getColumnInternalName(reduceValues.size() - 1); outputColumnNames.add(col); - reduceSinkOutputRowResolver2.putExpression(t, new ColumnInfo( - Utilities.ReduceField.VALUE.toString() + "." + col, typeInfo, "", - false)); + ColumnInfo colInfo = new ColumnInfo(Utilities.ReduceField.VALUE.toString() + + "." + col, typeInfo, "", false); + reduceSinkOutputRowResolver2.putExpression(t, colInfo); + colExprMap.put(colInfo.getInternalName(), inputExpr); } ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap( OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys, - reduceValues, outputColumnNames, true, -1, numPartitionFields, - numReducers), new RowSchema(reduceSinkOutputRowResolver2 - .getColumnInfos()), groupByOperatorInfo), + reduceValues, outputColumnNames, true, -1, numPartitionFields, + numReducers), new RowSchema(reduceSinkOutputRowResolver2 + .getColumnInfos()), groupByOperatorInfo), reduceSinkOutputRowResolver2); rsOp.setColumnExprMap(colExprMap); @@ -3200,7 +3242,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { * Generate the second GroupByOperator for the Group By Plan * (parseInfo.getXXX(dest)). The new GroupByOperator will do the second * aggregation based on the partial aggregation results. - * + * * @param mode * the mode of aggregation (FINAL) * @param genericUDAFEvaluators @@ -3252,12 +3294,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { assert (paraExpression != null); aggParameters.add(new ExprNodeColumnDesc(paraExprInfo.getType(), paraExpression, paraExprInfo.getTabAlias(), paraExprInfo - .getIsVirtualCol())); + .getIsVirtualCol())); String aggName = value.getChild(0).getText(); boolean isDistinct = value.getType() == HiveParser.TOK_FUNCTIONDI; - boolean isStar = value.getType() == HiveParser.TOK_FUNCTIONSTAR; Mode amode = groupByDescModeToUDAFMode(mode, isDistinct); GenericUDAFEvaluator genericUDAFEvaluator = genericUDAFEvaluators .get(entry.getKey()); @@ -3270,7 +3311,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { udaf.genericUDAFEvaluator, udaf.convertedParameters, (mode != GroupByDesc.Mode.FINAL && value.getToken().getType() == - HiveParser.TOK_FUNCTIONDI), + HiveParser.TOK_FUNCTIONDI), amode)); String field = getColumnInternalName(groupByKeys.size() + aggregations.size() - 1); @@ -3279,11 +3320,14 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { field, udaf.returnType, "", false)); } float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); - float memoryThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); - Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( - new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false,groupByMemoryUsage,memoryThreshold), new RowSchema(groupByOutputRowResolver2.getColumnInfos()), - reduceSinkOperatorInfo2), groupByOutputRowResolver2); + float memoryThreshold = + HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + Operator op = + putOpInsertMap(OperatorFactory.getAndMakeChild( + new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, + false, groupByMemoryUsage, memoryThreshold), new RowSchema( + groupByOutputRowResolver2.getColumnInfos()), + reduceSinkOperatorInfo2), groupByOutputRowResolver2); op.setColumnExprMap(colExprMap); return op; } @@ -3291,26 +3335,26 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { /** * Generate a Group-By plan using a single map-reduce job (3 operators will be * inserted): - * + * * ReduceSink ( keys = (K1_EXP, K2_EXP, DISTINCT_EXP), values = (A1_EXP, * A2_EXP) ) SortGroupBy (keys = (KEY.0,KEY.1), aggregations = * (count_distinct(KEY.2), sum(VALUE.0), count(VALUE.1))) Select (final * selects). - * + * * @param dest * @param qb * @param input * @return * @throws SemanticException - * + * * Generate a Group-By plan using 1 map-reduce job. Spray by the * group by key, and sort by the distinct key (if any), and compute * aggregates * The aggregation evaluation functions are as * follows: Partitioning Key: grouping key - * + * * Sorting Key: grouping key if no DISTINCT grouping + distinct key * if DISTINCT - * + * * Reducer: iterate/merge (mode = COMPLETE) **/ @SuppressWarnings({"nls"}) @@ -3355,7 +3399,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (whereExpr != null) { OpParseContext inputCtx = opParseCtx.get(input); RowResolver inputRR = inputCtx.getRowResolver(); - ExprNodeDesc current = genExprNodeDesc((ASTNode)whereExpr.getChild(0), inputRR); + ExprNodeDesc current = genExprNodeDesc((ASTNode) whereExpr.getChild(0), inputRR); // Check the list of where expressions already added so they aren't duplicated ExprNodeDesc.ExprNodeDescEqualityWrapper currentWrapped = @@ -3392,8 +3436,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { FilterDesc orFilterDesc = new FilterDesc(previous, false); selectInput = putOpInsertMap(OperatorFactory.getAndMakeChild( - orFilterDesc, new RowSchema( - inputRR.getColumnInfos()), input), inputRR); + orFilterDesc, new RowSchema( + inputRR.getColumnInfos()), input), inputRR); } // insert a select operator here used by the ColumnPruner to reduce @@ -3440,30 +3484,30 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { /** * Generate a Multi Group-By plan using a 2 map-reduce jobs. - * + * * @param dest * @param qb * @param input * @return * @throws SemanticException - * + * * Generate a Group-By plan using a 2 map-reduce jobs. Spray by the * distinct key in hope of getting a uniform distribution, and * compute partial aggregates by the grouping key. Evaluate partial * aggregates first, and spray by the grouping key to compute actual * aggregates in the second phase. The agggregation evaluation * functions are as follows: Partitioning Key: distinct key - * + * * Sorting Key: distinct key - * + * * Reducer: iterate/terminatePartial (mode = PARTIAL1) - * + * * STAGE 2 - * + * * Partitioning Key: grouping key - * + * * Sorting Key: grouping key - * + * * Reducer: merge/terminate (mode = FINAL) */ @SuppressWarnings("nls") @@ -3472,7 +3516,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // ////// Generate GroupbyOperator for a map-side partial aggregation Map genericUDAFEvaluators = - new LinkedHashMap(); + new LinkedHashMap(); QBParseInfo parseInfo = qb.getParseInfo(); @@ -3498,20 +3542,20 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { /** * Generate a Group-By plan using a 2 map-reduce jobs (5 operators will be * inserted): - * + * * ReduceSink ( keys = (K1_EXP, K2_EXP, DISTINCT_EXP), values = (A1_EXP, * A2_EXP) ) NOTE: If DISTINCT_EXP is null, partition by rand() SortGroupBy * (keys = (KEY.0,KEY.1), aggregations = (count_distinct(KEY.2), sum(VALUE.0), * count(VALUE.1))) ReduceSink ( keys = (0,1), values=(2,3,4)) SortGroupBy * (keys = (KEY.0,KEY.1), aggregations = (sum(VALUE.0), sum(VALUE.1), * sum(VALUE.2))) Select (final selects). - * + * * @param dest * @param qb * @param input * @return * @throws SemanticException - * + * * Generate a Group-By plan using a 2 map-reduce jobs. Spray by the * grouping key and distinct key (or a random number, if no distinct * is present) in hope of getting a uniform distribution, and @@ -3521,19 +3565,19 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { * phase. The agggregation evaluation functions are as follows: * Partitioning Key: random() if no DISTINCT grouping + distinct key * if DISTINCT - * + * * Sorting Key: grouping key if no DISTINCT grouping + distinct key * if DISTINCT - * + * * Reducer: iterate/terminatePartial (mode = PARTIAL1) - * + * * STAGE 2 - * + * * Partitioning Key: grouping key - * + * * Sorting Key: grouping key if no DISTINCT grouping + distinct key * if DISTINCT - * + * * Reducer: merge/terminate (mode = FINAL) */ @SuppressWarnings("nls") @@ -3551,11 +3595,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // captured by WritableComparableHiveObject.hashCode() function. Operator reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(qb, dest, input, (parseInfo.getDistinctFuncExprsForClause(dest).isEmpty() ? - -1 : Integer.MAX_VALUE), -1, false); + -1 : Integer.MAX_VALUE), -1, false); // ////// 2. Generate GroupbyOperator Map genericUDAFEvaluators = - new LinkedHashMap(); + new LinkedHashMap(); GroupByOperator groupByOperatorInfo = (GroupByOperator) genGroupByPlanGroupByOperator( parseInfo, dest, reduceSinkOperatorInfo, GroupByDesc.Mode.PARTIAL1, genericUDAFEvaluators); @@ -3597,15 +3641,15 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { * we may turn off map-side partial aggregation based on its performance. Then * spray by the group by key, and sort by the distinct key (if any), and * compute aggregates based on actual aggregates - * + * * The agggregation evaluation functions are as follows: Mapper: * iterate/terminatePartial (mode = HASH) - * + * * Partitioning Key: grouping key - * + * * Sorting Key: grouping key if no DISTINCT grouping + distinct key if * DISTINCT - * + * * Reducer: iterate/terminate if DISTINCT merge/terminate if NO DISTINCT (mode * = MERGEPARTIAL) */ @@ -3617,7 +3661,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // ////// Generate GroupbyOperator for a map-side partial aggregation Map genericUDAFEvaluators = - new LinkedHashMap(); + new LinkedHashMap(); GroupByOperator groupByOperatorInfo = (GroupByOperator) genGroupByPlanMapGroupByOperator( qb, dest, inputOperatorInfo, GroupByDesc.Mode.HASH, genericUDAFEvaluators); @@ -3661,23 +3705,23 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { * key). Evaluate partial aggregates first, and spray by the grouping key to * compute actual aggregates in the second phase. The agggregation evaluation * functions are as follows: Mapper: iterate/terminatePartial (mode = HASH) - * + * * Partitioning Key: random() if no DISTINCT grouping + distinct key if * DISTINCT - * + * * Sorting Key: grouping key if no DISTINCT grouping + distinct key if * DISTINCT - * + * * Reducer: iterate/terminatePartial if DISTINCT merge/terminatePartial if NO * DISTINCT (mode = MERGEPARTIAL) - * + * * STAGE 2 - * + * * Partitioining Key: grouping key - * + * * Sorting Key: grouping key if no DISTINCT grouping + distinct key if * DISTINCT - * + * * Reducer: merge/terminate (mode = FINAL) */ @SuppressWarnings("nls") @@ -3688,7 +3732,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // ////// Generate GroupbyOperator for a map-side partial aggregation Map genericUDAFEvaluators = - new LinkedHashMap(); + new LinkedHashMap(); GroupByOperator groupByOperatorInfo = (GroupByOperator) genGroupByPlanMapGroupByOperator( qb, dest, inputOperatorInfo, GroupByDesc.Mode.HASH, genericUDAFEvaluators); @@ -3703,8 +3747,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // ////// Generate ReduceSink Operator Operator reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(qb, dest, groupByOperatorInfo, (parseInfo - .getDistinctFuncExprsForClause(dest).isEmpty() ? -1 - : Integer.MAX_VALUE), -1, true); + .getDistinctFuncExprsForClause(dest).isEmpty() ? -1 + : Integer.MAX_VALUE), -1, true); // ////// Generate GroupbyOperator for a partial aggregation Operator groupByOperatorInfo2 = genGroupByPlanGroupByOperator1(parseInfo, @@ -3729,7 +3773,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // ////// Generate ReduceSink Operator Operator reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(qb, dest, groupByOperatorInfo, getGroupByForClause(parseInfo, dest) - .size(), 1, true); + .size(), 1, true); return genGroupByPlanGroupByOperator2MR(parseInfo, dest, reduceSinkOperatorInfo, GroupByDesc.Mode.FINAL, genericUDAFEvaluators); @@ -3759,10 +3803,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } private int getReducersBucketing(int totalFiles, int maxReducers) { - int numFiles = totalFiles/maxReducers; + int numFiles = totalFiles / maxReducers; while (true) { - if (totalFiles%numFiles == 0) { - return totalFiles/numFiles; + if (totalFiles % numFiles == 0) { + return totalFiles / numFiles; } numFiles++; } @@ -3771,8 +3815,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { private static class SortBucketRSCtx { ArrayList partnCols; boolean multiFileSpray; - int numFiles; - int totalFiles; + int numFiles; + int totalFiles; public SortBucketRSCtx() { partnCols = null; @@ -3789,7 +3833,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } /** - * @param partnCols the partnCols to set + * @param partnCols + * the partnCols to set */ public void setPartnCols(ArrayList partnCols) { this.partnCols = partnCols; @@ -3803,7 +3848,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } /** - * @param multiFileSpray the multiFileSpray to set + * @param multiFileSpray + * the multiFileSpray to set */ public void setMultiFileSpray(boolean multiFileSpray) { this.multiFileSpray = multiFileSpray; @@ -3817,7 +3863,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } /** - * @param numFiles the numFiles to set + * @param numFiles + * the numFiles to set */ public void setNumFiles(int numFiles) { this.numFiles = numFiles; @@ -3831,7 +3878,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } /** - * @param totalFiles the totalFiles to set + * @param totalFiles + * the totalFiles to set */ public void setTotalFiles(int totalFiles) { this.totalFiles = totalFiles; @@ -3839,8 +3887,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } @SuppressWarnings("nls") - private Operator genBucketingSortingDest(String dest, Operator input, QB qb, TableDesc table_desc, - Table dest_tab, SortBucketRSCtx ctx) + private Operator genBucketingSortingDest(String dest, Operator input, QB qb, + TableDesc table_desc, + Table dest_tab, SortBucketRSCtx ctx) throws SemanticException { // If the table is bucketed, and bucketing is enforced, do the following: @@ -3850,20 +3899,21 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // spray the data into multiple buckets. That way, we can support a very large // number of buckets without needing a very large number of reducers. boolean enforceBucketing = false; - boolean enforceSorting = false; + boolean enforceSorting = false; ArrayList partnCols = new ArrayList(); ArrayList partnColsNoConvert = new ArrayList(); - ArrayList sortCols = new ArrayList(); + ArrayList sortCols = new ArrayList(); ArrayList sortOrders = new ArrayList(); boolean multiFileSpray = false; - int numFiles = 1; - int totalFiles = 1; + int numFiles = 1; + int totalFiles = 1; if ((dest_tab.getNumBuckets() > 0) && (conf.getBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETING))) { enforceBucketing = true; partnCols = getParitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, true); - partnColsNoConvert = getParitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, false); + partnColsNoConvert = + getParitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, false); } if ((dest_tab.getSortCols() != null) && @@ -3880,7 +3930,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (enforceBucketing || enforceSorting) { int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); - int numBuckets = dest_tab.getNumBuckets(); + int numBuckets = dest_tab.getNumBuckets(); if (numBuckets > maxReducers) { multiFileSpray = true; totalFiles = numBuckets; @@ -3890,7 +3940,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { else { // find the number of reducers such that it is a divisor of totalFiles maxReducers = getReducersBucketing(totalFiles, maxReducers); - numFiles = totalFiles/maxReducers; + numFiles = totalFiles / maxReducers; } } else { @@ -3903,7 +3953,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { ctx.setNumFiles(numFiles); ctx.setPartnCols(partnColsNoConvert); ctx.setTotalFiles(totalFiles); - //disable "merge mapfiles" and "merge mapred files". + // disable "merge mapfiles" and "merge mapred files". HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPFILES, false); HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false); } @@ -3912,6 +3962,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { /** * Check for HOLD_DDLTIME hint. + * * @param qb * @return true if HOLD_DDLTIME is set, false otherwise. */ @@ -3937,7 +3988,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { QBMetaData qbm = qb.getMetaData(); Integer dest_type = qbm.getDestTypeForAlias(dest); - Table dest_tab = null; // destination table if any + Table dest_tab = null; // destination table if any Partition dest_part = null;// destination partition if any String queryTmpdir = null; // the intermediate destination directory Path dest_path = null; // the final destination directory @@ -3957,7 +4008,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // Is the user trying to insert into a external tables if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) && (dest_tab.getTableType().equals(TableType.EXTERNAL_TABLE))) { - throw new SemanticException( + throw new SemanticException( ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName())); } @@ -3967,17 +4018,17 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // check for partition List parts = dest_tab.getPartitionKeys(); if (parts != null && parts.size() > 0) { // table is partitioned - if (partSpec== null || partSpec.size() == 0) { // user did NOT specify partition + if (partSpec == null || partSpec.size() == 0) { // user did NOT specify partition throw new SemanticException(generateErrorMessage( - qb.getParseInfo().getDestForClause(dest), - ErrorMsg.NEED_PARTITION_ERROR.getMsg())); + qb.getParseInfo().getDestForClause(dest), + ErrorMsg.NEED_PARTITION_ERROR.getMsg())); } // the HOLD_DDLTIIME hint should not be used with dynamic partition since the // newly generated partitions should always update their DDLTIME if (holdDDLTime) { throw new SemanticException(generateErrorMessage( - qb.getParseInfo().getDestForClause(dest), - ErrorMsg.HOLD_DDLTIME_ON_NONEXIST_PARTITIONS.getMsg())); + qb.getParseInfo().getDestForClause(dest), + ErrorMsg.HOLD_DDLTIME_ON_NONEXIST_PARTITIONS.getMsg())); } dpCtx = qbm.getDPCtx(dest); if (dpCtx == null) { @@ -3991,8 +4042,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING)) { // allow DP if (dpCtx.getNumDPCols() > 0 && (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPFILES) || - HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPREDFILES)) && - Utilities.supportCombineFileInputFormat() == false) { + HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPREDFILES)) && + Utilities.supportCombineFileInputFormat() == false) { // Do not support merge for Hadoop versions (pre-0.20) that do not // support CombineHiveInputFormat HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPFILES, false); @@ -4003,8 +4054,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } else { // QBMetaData.DEST_PARTITION capture the all-SP case throw new SemanticException(generateErrorMessage( - qb.getParseInfo().getDestForClause(dest), - ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg())); + qb.getParseInfo().getDestForClause(dest), + ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg())); } if (dpCtx.getSPPath() != null) { dest_path = new Path(dest_tab.getPath(), dpCtx.getSPPath()); @@ -4069,7 +4120,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { else { try { String ppath = dpCtx.getSPPath(); - ppath = ppath.substring(0, ppath.length()-1); + ppath = ppath.substring(0, ppath.length() - 1); DummyPartition p = new DummyPartition(dest_tab, dest_tab.getDbName() + "@" + dest_tab.getTableName() + "@" + ppath, @@ -4089,15 +4140,15 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { dest_tab = dest_part.getTable(); if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) && dest_tab.getTableType().equals(TableType.EXTERNAL_TABLE)) { - throw new SemanticException( + throw new SemanticException( ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName())); } Path tabPath = dest_tab.getPath(); Path partPath = dest_part.getPartitionPath(); - // if the table is in a different dfs than the partition, - // replace the partition's dfs with the table's dfs. + // if the table is in a different dfs than the partition, + // replace the partition's dfs with the table's dfs. dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri() .getAuthority(), partPath.toUri().getPath()); @@ -4121,8 +4172,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { Partition part = db.getPartition(dest_tab, dest_part.getSpec(), false); if (part == null) { throw new SemanticException(generateErrorMessage( - qb.getParseInfo().getDestForClause(dest), - ErrorMsg.HOLD_DDLTIME_ON_NONEXIST_PARTITIONS.getMsg())); + qb.getParseInfo().getDestForClause(dest), + ErrorMsg.HOLD_DDLTIME_ON_NONEXIST_PARTITIONS.getMsg())); } } catch (HiveException e) { throw new SemanticException(e); @@ -4269,7 +4320,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { for (int i = 0; i < fields.size(); i++) { vecCol.add(new ColumnInfo(fields.get(i).getFieldName(), TypeInfoUtils .getTypeInfoFromObjectInspector(fields.get(i) - .getFieldObjectInspector()), "", false)); + .getFieldObjectInspector()), "", false)); } } catch (Exception e) { throw new SemanticException(e.getMessage(), e); @@ -4278,19 +4329,19 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { RowSchema fsRS = new RowSchema(vecCol); FileSinkDesc fileSinkDesc = new FileSinkDesc( - queryTmpdir, - table_desc, - conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), - currentTableId, - rsCtx.isMultiFileSpray(), - rsCtx.getNumFiles(), - rsCtx.getTotalFiles(), - rsCtx.getPartnCols(), - dpCtx); + queryTmpdir, + table_desc, + conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), + currentTableId, + rsCtx.isMultiFileSpray(), + rsCtx.getNumFiles(), + rsCtx.getTotalFiles(), + rsCtx.getPartnCols(), + dpCtx); // set the stats publishing/aggregating key prefix // the same as directory name. The directory name - // can be changed in the optimizer but the key should not be changed + // can be changed in the optimizer but the key should not be changed // it should be the same as the MoveWork's sourceDir. fileSinkDesc.setStatsAggPrefix(fileSinkDesc.getDirName()); @@ -4306,16 +4357,16 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(fileSinkDesc, - fsRS, input), inputRR); + fsRS, input), inputRR); if (ltd != null && SessionState.get() != null) { SessionState.get().getLineageState() - .mapDirToFop(ltd.getSourceDir(), (FileSinkOperator)output); + .mapDirToFop(ltd.getSourceDir(), (FileSinkOperator) output); } if (LOG.isDebugEnabled()) { LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: " - + dest_path + " row schema: " + inputRR.toString()); + + dest_path + " row schema: " + inputRR.toString()); } return output; @@ -4346,7 +4397,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { int inColumnCnt = rowFields.size(); int outColumnCnt = tableFields.size(); if (dynPart && dpCtx != null) { - outColumnCnt += dpCtx.getNumDPCols(); + outColumnCnt += dpCtx.getNumDPCols(); } if (inColumnCnt != outColumnCnt) { @@ -4354,7 +4405,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { + " columns, but query has " + inColumnCnt + " columns."; throw new SemanticException(ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg( qb.getParseInfo().getDestForClause(dest), reason)); - } else if (dynPart && dpCtx != null){ + } else if (dynPart && dpCtx != null) { // create the mapping from input ExprNode to dest table DP column dpCtx.mapInputToDP(rowFields.subList(tableFields.size(), rowFields.size())); } @@ -4385,8 +4436,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // JSON-format. if (!tableFieldTypeInfo.equals(rowFieldTypeInfo) && !(isLazySimpleSerDe - && tableFieldTypeInfo.getCategory().equals(Category.PRIMITIVE) && tableFieldTypeInfo - .equals(TypeInfoFactory.stringTypeInfo))) { + && tableFieldTypeInfo.getCategory().equals(Category.PRIMITIVE) && tableFieldTypeInfo + .equals(TypeInfoFactory.stringTypeInfo))) { // need to do some conversions here converted = true; if (tableFieldTypeInfo.getCategory() != Category.PRIMITIVE) { @@ -4411,7 +4462,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // deal with dynamic partition columns: convert ExprNodeDesc type to String?? if (dynPart && dpCtx != null && dpCtx.getNumDPCols() > 0) { // DP columns starts with tableFields.size() - for (int i = tableFields.size(); i < rowFields.size(); ++i ) { + for (int i = tableFields.size(); i < rowFields.size(); ++i) { TypeInfo rowFieldTypeInfo = rowFields.get(i).getType(); ExprNodeDesc column = new ExprNodeColumnDesc( rowFieldTypeInfo, rowFields.get(i).getInternalName(), "", false); @@ -4432,7 +4483,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild( new SelectDesc(expressions, colName), new RowSchema(rowResolver - .getColumnInfos()), input), rowResolver); + .getColumnInfos()), input), rowResolver); return output; } else { @@ -4462,7 +4513,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (LOG.isDebugEnabled()) { LOG.debug("Created LimitOperator Plan for clause: " + dest - + " row schema: " + inputRR.toString()); + + " row schema: " + inputRR.toString()); } return limitMap; @@ -4492,7 +4543,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (LOG.isDebugEnabled()) { LOG.debug("Table alias: " + outputTableAlias + " Col aliases: " - + colAliases); + + colAliases); } // Use the RowResolver from the input operator to generate a input @@ -4519,7 +4570,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (numUdtfCols != numSuppliedAliases) { throw new SemanticException(ErrorMsg.UDTF_ALIAS_MISMATCH .getMsg("expected " + numUdtfCols + " aliases " + "but got " - + numSuppliedAliases)); + + numSuppliedAliases)); } // Generate the output column info's / row resolver using internal names. @@ -4575,10 +4626,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } private ArrayList getParitionColsFromBucketCols(String dest, QB qb, Table tab, - TableDesc table_desc, Operator input, boolean convert) - throws SemanticException { + TableDesc table_desc, Operator input, boolean convert) + throws SemanticException { List tabBucketCols = tab.getBucketCols(); - List tabCols = tab.getCols(); + List tabCols = tab.getCols(); // Partition by the bucketing column List posns = new ArrayList(); @@ -4597,8 +4648,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { return genConvertCol(dest, qb, tab, table_desc, input, posns, convert); } - private ArrayList genConvertCol(String dest, QB qb, Table tab, TableDesc table_desc, Operator input, - List posns, boolean convert) throws SemanticException { + private ArrayList genConvertCol(String dest, QB qb, Table tab, + TableDesc table_desc, Operator input, + List posns, boolean convert) throws SemanticException { StructObjectInspector oi = null; try { Deserializer deserializer = table_desc.getDeserializerClass() @@ -4616,12 +4668,13 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // Check column type int columnNumber = posns.size(); ArrayList expressions = new ArrayList(columnNumber); - for (Integer posn: posns) { + for (Integer posn : posns) { ObjectInspector tableFieldOI = tableFields.get(posn).getFieldObjectInspector(); TypeInfo tableFieldTypeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(tableFieldOI); TypeInfo rowFieldTypeInfo = rowFields.get(posn).getType(); - ExprNodeDesc column = new ExprNodeColumnDesc(rowFieldTypeInfo, rowFields.get(posn).getInternalName(), - rowFields.get(posn).getTabAlias(), rowFields.get(posn).getIsVirtualCol()); + ExprNodeDesc column = + new ExprNodeColumnDesc(rowFieldTypeInfo, rowFields.get(posn).getInternalName(), + rowFields.get(posn).getTabAlias(), rowFields.get(posn).getIsVirtualCol()); if (convert && !tableFieldTypeInfo.equals(rowFieldTypeInfo)) { // need to do some conversions here @@ -4630,14 +4683,14 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { column = null; } else { column = TypeCheckProcFactory.DefaultExprProcessor - .getFuncExprNodeDesc(tableFieldTypeInfo.getTypeName(), - column); + .getFuncExprNodeDesc(tableFieldTypeInfo.getTypeName(), + column); } if (column == null) { String reason = "Cannot convert column " + posn + " from " - + rowFieldTypeInfo + " to " + tableFieldTypeInfo + "."; + + rowFieldTypeInfo + " to " + tableFieldTypeInfo + "."; throw new SemanticException(ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH - .getMsg(qb.getParseInfo().getDestForClause(dest), reason)); + .getMsg(qb.getParseInfo().getDestForClause(dest), reason)); } } expressions.add(column); @@ -4646,11 +4699,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { return expressions; } - private ArrayList getSortCols(String dest, QB qb, Table tab, TableDesc table_desc, Operator input, boolean convert) - throws SemanticException { + private ArrayList getSortCols(String dest, QB qb, Table tab, TableDesc table_desc, + Operator input, boolean convert) + throws SemanticException { RowResolver inputRR = opParseCtx.get(input).getRowResolver(); List tabSortCols = tab.getSortCols(); - List tabCols = tab.getCols(); + List tabCols = tab.getCols(); // Partition by the bucketing column List posns = new ArrayList(); @@ -4670,10 +4724,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } private ArrayList getSortOrders(String dest, QB qb, Table tab, Operator input) - throws SemanticException { - RowResolver inputRR = opParseCtx.get(input).getRowResolver(); + throws SemanticException { + opParseCtx.get(input).getRowResolver(); List tabSortCols = tab.getSortCols(); - List tabCols = tab.getCols(); + List tabCols = tab.getCols(); ArrayList orders = new ArrayList(); for (Order sortCol : tabSortCols) { @@ -4689,11 +4743,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { @SuppressWarnings("nls") private Operator genReduceSinkPlanForSortingBucketing(Table tab, Operator input, - ArrayList sortCols, - List sortOrders, - ArrayList partitionCols, - int numReducers) - throws SemanticException { + ArrayList sortCols, + List sortOrders, + ArrayList partitionCols, + int numReducers) + throws SemanticException { RowResolver inputRR = opParseCtx.get(input).getRowResolver(); // For the generation of the values expression just get the inputs @@ -4715,12 +4769,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { StringBuilder order = new StringBuilder(); for (int sortOrder : sortOrders) { - order.append(sortOrder == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC ? '+' :'-'); + order.append(sortOrder == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC ? '+' : '-'); } Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils .getReduceSinkDesc(sortCols, valueCols, outputColumns, false, -1, - partitionCols, order.toString(), numReducers), + partitionCols, order.toString(), numReducers), new RowSchema(inputRR.getColumnInfos()), input), inputRR); interim.setColumnExprMap(colExprMap); @@ -4738,12 +4792,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild( new ExtractDesc(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, - Utilities.ReduceField.VALUE.toString(), "", false)), new RowSchema( - out_rwsch.getColumnInfos()), interim), out_rwsch); + Utilities.ReduceField.VALUE.toString(), "", false)), new RowSchema( + out_rwsch.getColumnInfos()), interim), out_rwsch); if (LOG.isDebugEnabled()) { LOG.debug("Created ReduceSink Plan for table: " + tab.getTableName() + - " row schema: " + out_rwsch.toString()); + " row schema: " + out_rwsch.toString()); } return output; @@ -4786,7 +4840,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { "strict") && limit == null) { throw new SemanticException(generateErrorMessage(sortExprs, - ErrorMsg.NO_LIMIT_WITH_ORDERBY.getMsg())); + ErrorMsg.NO_LIMIT_WITH_ORDERBY.getMsg())); } } } @@ -4833,7 +4887,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils .getReduceSinkDesc(sortCols, valueCols, outputColumns, false, -1, - partitionCols, order.toString(), numReducers), + partitionCols, order.toString(), numReducers), new RowSchema(inputRR.getColumnInfos()), input), inputRR); interim.setColumnExprMap(colExprMap); @@ -4851,12 +4905,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild( new ExtractDesc(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, - Utilities.ReduceField.VALUE.toString(), "", false)), new RowSchema( - out_rwsch.getColumnInfos()), interim), out_rwsch); + Utilities.ReduceField.VALUE.toString(), "", false)), new RowSchema( + out_rwsch.getColumnInfos()), interim), out_rwsch); if (LOG.isDebugEnabled()) { LOG.debug("Created ReduceSink Plan for clause: " + dest + " row schema: " - + out_rwsch.toString()); + + out_rwsch.toString()); } return output; } @@ -4875,7 +4929,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { Map colExprMap = new HashMap(); HashMap> posToAliasMap = new HashMap>(); HashMap> filterMap = - new HashMap>(); + new HashMap>(); for (int pos = 0; pos < right.length; ++pos) { @@ -5014,9 +5068,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap( OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys, - reduceValues, outputColumns, false, joinTree.getNextTag(), - reduceKeys.size(), numReds), new RowSchema(outputRS - .getColumnInfos()), child), outputRS); + reduceValues, outputColumns, false, joinTree.getNextTag(), + reduceKeys.size(), numReds), new RowSchema(outputRS + .getColumnInfos()), child), outputRS); rsOp.setColumnExprMap(colExprMap); return rsOp; } @@ -5084,7 +5138,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { /** * Construct a selection operator for semijoin that filter out all fields * other than the group by keys. - * + * * @param fields * list of fields need to be output * @param input @@ -5110,7 +5164,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // create selection operator Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild( new SelectDesc(colList, columnNames, false), new RowSchema(inputRR - .getColumnInfos()), input), inputRR); + .getColumnInfos()), input), inputRR); output.setColumnExprMap(input.getColumnExprMap()); return output; @@ -5159,11 +5213,14 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // Generate group-by operator float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); - float memoryThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); - Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( - new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false,groupByMemoryUsage,memoryThreshold), new RowSchema(groupByOutputRowResolver.getColumnInfos()), - inputOperatorInfo), groupByOutputRowResolver); + float memoryThreshold = + HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + Operator op = + putOpInsertMap(OperatorFactory.getAndMakeChild( + new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, + false, groupByMemoryUsage, memoryThreshold), + new RowSchema(groupByOutputRowResolver.getColumnInfos()), + inputOperatorInfo), groupByOutputRowResolver); op.setColumnExprMap(colExprMap); return op; @@ -5195,7 +5252,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (commonType == null) { throw new SemanticException( "Cannot do equality join on different types: " + a.getTypeName() - + " and " + b.getTypeName()); + + " and " + b.getTypeName()); } } // Add implicit type conversion if necessary @@ -5204,7 +5261,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { keys.get(i).set( k, TypeCheckProcFactory.DefaultExprProcessor.getFuncExprNodeDesc( - commonType.getTypeName(), keys.get(i).get(k))); + commonType.getTypeName(), keys.get(i).get(k))); } } } @@ -5303,7 +5360,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { String alias = child.getChildCount() == 1 ? tableName : unescapeIdentifier(child.getChild(child.getChildCount() - 1) - .getText().toLowerCase()); + .getText().toLowerCase()); if (i == 0) { leftAliases.add(alias); @@ -5404,10 +5461,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if ((left.getToken().getType() == HiveParser.TOK_TABREF) || (left.getToken().getType() == HiveParser.TOK_SUBQUERY)) { String tableName = getUnescapedUnqualifiedTableName((ASTNode) left.getChild(0)) - .toLowerCase(); + .toLowerCase(); String alias = left.getChildCount() == 1 ? tableName : unescapeIdentifier(left.getChild(left.getChildCount() - 1) - .getText().toLowerCase()); + .getText().toLowerCase()); joinTree.setLeftAlias(alias); String[] leftAliases = new String[1]; leftAliases[0] = alias; @@ -5432,10 +5489,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if ((right.getToken().getType() == HiveParser.TOK_TABREF) || (right.getToken().getType() == HiveParser.TOK_SUBQUERY)) { String tableName = getUnescapedUnqualifiedTableName((ASTNode) right.getChild(0)) - .toLowerCase(); + .toLowerCase(); String alias = right.getChildCount() == 1 ? tableName : unescapeIdentifier(right.getChild(right.getChildCount() - 1) - .getText().toLowerCase()); + .getText().toLowerCase()); String[] rightAliases = new String[1]; rightAliases[0] = alias; joinTree.setRightAliases(rightAliases); @@ -5467,7 +5524,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { joinTree.setFilters(filters); ArrayList> filtersForPushing = - new ArrayList>(); + new ArrayList>(); filtersForPushing.add(new ArrayList()); filtersForPushing.add(new ArrayList()); joinTree.setFiltersForPushing(filtersForPushing); @@ -5569,7 +5626,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { ArrayList nns = node.getNullSafes(); ArrayList tns = target.getNullSafes(); for (int i = 0; i < tns.size(); i++) { - tns.set(i, tns.get(i) & nns.get(i)); // any of condition contains non-NS, non-NS + tns.set(i, tns.get(i) & nns.get(i)); // any of condition contains non-NS, non-NS } ArrayList> filters = target.getFilters(); @@ -5741,7 +5798,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild( new SelectDesc(colList, columnNames, true), new RowSchema(inputRR - .getColumnInfos()), input), inputRR); + .getColumnInfos()), input), inputRR); output.setColumnExprMap(columnExprMap); return output; } @@ -5788,7 +5845,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } List currASTList = new ArrayList(); - for (ASTNode value: list) { + for (ASTNode value : list) { // 0 is function name for (int i = 1; i < value.getChildCount(); i++) { ASTNode parameter = (ASTNode) value.getChild(i); @@ -5858,6 +5915,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { reduceValues.size() - 1).getTypeInfo(), "", false); reduceSinkOutputRowResolver.putExpression(grpbyExpr, colInfo); outputColumnNames.add(getColumnInternalName(reduceValues.size() - 1)); + colExprMap.put(colInfo.getInternalName(), grpByExprNode); } } @@ -5884,6 +5942,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { reduceSinkOutputRowResolver.putExpression(paraExpr, colInfo); outputColumnNames .add(getColumnInternalName(reduceValues.size() - 1)); + colExprMap.put(colInfo.getInternalName(), paraExprNode); } } } @@ -5891,8 +5950,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap( OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys, - reduceValues, outputColumnNames, true, -1, reduceKeys.size(), -1), - new RowSchema(reduceSinkOutputRowResolver.getColumnInfos()), input), + reduceValues, outputColumnNames, true, -1, reduceKeys.size(), -1), + new RowSchema(reduceSinkOutputRowResolver.getColumnInfos()), input), reduceSinkOutputRowResolver); rsOp.setColumnExprMap(colExprMap); @@ -5900,7 +5959,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } // Groups the clause names into lists so that any two clauses in the same list has the same - // group by and distinct keys and no clause appears in more than one list. Returns a list of the + // group by and distinct keys and no clause appears in more than one list. Returns a list of the // lists of clauses. private List> getCommonGroupByDestGroups(QB qb, Operator input) throws SemanticException { @@ -5915,7 +5974,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // If this is a trivial query block return if (ks.size() <= 1) { - List oneList = new ArrayList(1); + List oneList = new ArrayList(1); if (ks.size() == 1) { oneList.add(ks.first()); } @@ -5934,7 +5993,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // Add the group by expressions List grpByExprs = getGroupByForClause(qbp, dest); - for (ASTNode grpByExpr: grpByExprs) { + for (ASTNode grpByExpr : grpByExprs) { ExprNodeDesc.ExprNodeDescEqualityWrapper grpByExprWrapper = new ExprNodeDesc.ExprNodeDescEqualityWrapper(genExprNodeDesc(grpByExpr, inputRR)); if (!sprayKeys.contains(grpByExprWrapper)) { @@ -5994,7 +6053,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { List distinctExprs = new ArrayList(); - for (ASTNode distinctAggExpr: distinctAggExprs) { + for (ASTNode distinctAggExpr : distinctAggExprs) { // 0 is function name for (int i = 1; i < distinctAggExpr.getChildCount(); i++) { ASTNode parameter = (ASTNode) distinctAggExpr.getChild(i); @@ -6092,7 +6151,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // Constructs a standard group by plan if: // There is no other subquery with the same group by/distinct keys or // (There are no aggregations in a representative query for the group and - // There is no group by in that representative query) or + // There is no group by in that representative query) or // The data is skewed or // The conf variable used to control combining group bys into a signle reducer is false if (commonGroupByDestGroup.size() == 1 || @@ -6111,9 +6170,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (qbp.getAggregationExprsForClause(dest).size() != 0 || getGroupByForClause(qbp, dest).size() > 0) { - //multiple distincts is not supported with skew in data + // multiple distincts is not supported with skew in data if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW) && - qbp.getDistinctFuncExprsForClause(dest).size() > 1) { + qbp.getDistinctFuncExprsForClause(dest).size() > 1) { throw new SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS. getMsg()); } @@ -6122,7 +6181,23 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { curr = insertSelectAllPlanForGroupBy(curr); if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) { if (!conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) { + Operator rsopInNonMapSidePattern = null; + Operator mapSideGroupBy = null; + if (conf.getBoolVar(HiveConf.ConfVars.HIVEOPTCORRELATION)) { + Operator nonMapSidePattern = genGroupByPlan1MR(dest, qb, curr); + rsopInNonMapSidePattern = (Operator) nonMapSidePattern + .getParentOperators().get(0); + curr.getChildOperators().remove(rsopInNonMapSidePattern); + } curr = genGroupByPlanMapAggr1MR(dest, qb, curr); + mapSideGroupBy = (Operator) ((Operator) curr.getParentOperators().get(0)) + .getParentOperators().get(0); + if (conf.getBoolVar(HiveConf.ConfVars.HIVEOPTCORRELATION)) { + groupbyNonMapSide2MapSide.put((ReduceSinkOperator) rsopInNonMapSidePattern, + (GroupByOperator) mapSideGroupBy); + groupbyMapSide2NonMapSide.put((GroupByOperator) mapSideGroupBy, + (ReduceSinkOperator) rsopInNonMapSidePattern); + } } else { curr = genGroupByPlanMapAggr2MR(dest, qb, curr); } @@ -6153,24 +6228,20 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // It would be good to support them in the future, but till then it is better // to throw a good semantic error instead of some crpytic error. private void checkExpression(ASTNode input, - ClauseType clauseType) throws SemanticException { + ClauseType clauseType) throws SemanticException { int childCount = input.getChildCount(); // Columns can only exist at the top if (input.getType() == HiveParser.TOK_TABLE_OR_COL) { switch (clauseType) { - case CLUSTER_BY_CLAUSE: - throw new - SemanticException(ErrorMsg.EXPRESSIONS_NOT_ALLOWED_CLUSTERBY.getMsg()); - case DISTRIBUTE_BY_CLAUSE: - throw new - SemanticException(ErrorMsg.EXPRESSIONS_NOT_ALLOWED_DISTRIBUTEBY.getMsg()); - case ORDER_BY_CLAUSE: - throw new - SemanticException(ErrorMsg.EXPRESSIONS_NOT_ALLOWED_ORDERBY.getMsg()); - case SORT_BY_CLAUSE: - throw new - SemanticException(ErrorMsg.EXPRESSIONS_NOT_ALLOWED_SORTBY.getMsg()); + case CLUSTER_BY_CLAUSE: + throw new SemanticException(ErrorMsg.EXPRESSIONS_NOT_ALLOWED_CLUSTERBY.getMsg()); + case DISTRIBUTE_BY_CLAUSE: + throw new SemanticException(ErrorMsg.EXPRESSIONS_NOT_ALLOWED_DISTRIBUTEBY.getMsg()); + case ORDER_BY_CLAUSE: + throw new SemanticException(ErrorMsg.EXPRESSIONS_NOT_ALLOWED_ORDERBY.getMsg()); + case SORT_BY_CLAUSE: + throw new SemanticException(ErrorMsg.EXPRESSIONS_NOT_ALLOWED_SORTBY.getMsg()); } } @@ -6183,23 +6254,23 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } private void validateExpressionSkipParent(ASTNode inputExpr, - ClauseType clauseType) throws SemanticException { + ClauseType clauseType) throws SemanticException { int childCount = inputExpr.getChildCount(); if (childCount > 0) { for (int pos = 0; pos < childCount; pos++) { - checkExpression((ASTNode)inputExpr.getChild(pos), clauseType); + checkExpression((ASTNode) inputExpr.getChild(pos), clauseType); } } } private void validateExpressionHandleTableQualifier(ASTNode inputExpr, - ClauseType clauseType) throws SemanticException { + ClauseType clauseType) throws SemanticException { // If the expression is tab.column, go to the columns // Same for value[3] if ((inputExpr.getType() == HiveParser.DOT) || (inputExpr.getType() == HiveParser.LSQUARE)) { for (int pos = 0; pos < inputExpr.getChildCount(); pos++) { - validateExpressionHandleTableQualifier((ASTNode)inputExpr.getChild(pos), clauseType); + validateExpressionHandleTableQualifier((ASTNode) inputExpr.getChild(pos), clauseType); } } else { validateExpressionSkipParent(inputExpr, clauseType); @@ -6209,7 +6280,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // Validate that the expression only consists of constants and columns. // Expressions are not allowed in the cluster/distribute/order/sort by list private void validateExpression(ASTNode expr, - ClauseType clauseType) throws SemanticException { + ClauseType clauseType) throws SemanticException { boolean isGrandChild = true; // The first level of children is whether it is ascending/descending @@ -6229,7 +6300,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (grandChildCount > 0) { for (int childPos = 0; childPos < grandChildCount; childPos++) { validateExpressionHandleTableQualifier( - (ASTNode)cl.getChild(childPos), clauseType); + (ASTNode) cl.getChild(childPos), clauseType); } } } @@ -6263,25 +6334,25 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // if the clause contains any expression. if (qbp.getClusterByForClause(dest) != null) { validateExpression(qbp.getClusterByForClause(dest), - ClauseType.CLUSTER_BY_CLAUSE); + ClauseType.CLUSTER_BY_CLAUSE); genReduceSink = true; } if (qbp.getDistributeByForClause(dest) != null) { validateExpression(qbp.getDistributeByForClause(dest), - ClauseType.DISTRIBUTE_BY_CLAUSE); + ClauseType.DISTRIBUTE_BY_CLAUSE); genReduceSink = true; } if (qbp.getOrderByForClause(dest) != null) { validateExpression(qbp.getOrderByForClause(dest), - ClauseType.ORDER_BY_CLAUSE); + ClauseType.ORDER_BY_CLAUSE); genReduceSink = true; } if (qbp.getSortByForClause(dest) != null) { validateExpression(qbp.getSortByForClause(dest), - ClauseType.SORT_BY_CLAUSE); + ClauseType.SORT_BY_CLAUSE); genReduceSink = true; } @@ -6352,7 +6423,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { HashMap rightmap = rightRR.getFieldMap(rightalias); // make sure the schemas of both sides are the same ASTNode tabref = qb.getAliases().isEmpty() ? null : - qb.getParseInfo().getSrcForAlias(qb.getAliases().get(0)); + qb.getParseInfo().getSrcForAlias(qb.getAliases().get(0)); if (leftmap.size() != rightmap.size()) { throw new SemanticException("Schema of both sides of union should match."); } @@ -6363,31 +6434,31 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (rInfo == null) { throw new SemanticException(generateErrorMessage(tabref, "Schema of both sides of union should match. " + rightalias - + " does not have the field " + field)); + + " does not have the field " + field)); } if (lInfo == null) { throw new SemanticException(generateErrorMessage(tabref, "Schema of both sides of union should match. " + leftalias - + " does not have the field " + field)); + + " does not have the field " + field)); } if (!lInfo.getInternalName().equals(rInfo.getInternalName())) { throw new SemanticException(generateErrorMessage(tabref, "Schema of both sides of union should match: field " + field + ":" - + " appears on the left side of the UNION at column position: " + - getPositionFromInternalName(lInfo.getInternalName()) - + ", and on the right side of the UNION at column position: " + - getPositionFromInternalName(rInfo.getInternalName()) - + ". Column positions should match for a UNION")); + + " appears on the left side of the UNION at column position: " + + getPositionFromInternalName(lInfo.getInternalName()) + + ", and on the right side of the UNION at column position: " + + getPositionFromInternalName(rInfo.getInternalName()) + + ". Column positions should match for a UNION")); } - //try widening coversion, otherwise fail union + // try widening coversion, otherwise fail union TypeInfo commonTypeInfo = FunctionRegistry.getCommonClassForUnionAll(lInfo.getType(), rInfo.getType()); if (commonTypeInfo == null) { throw new SemanticException(generateErrorMessage(tabref, "Schema of both sides of union should match: Column " + field - + " is of type " + lInfo.getType().getTypeName() - + " on first table and type " + rInfo.getType().getTypeName() - + " on second table")); + + " is of type " + lInfo.getType().getTypeName() + + " on first table and type " + rInfo.getType().getTypeName() + + " on second table")); } } @@ -6398,7 +6469,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { ColumnInfo lInfo = lEntry.getValue(); ColumnInfo rInfo = rightmap.get(field); lInfo.setType(FunctionRegistry.getCommonClassForUnionAll(lInfo.getType(), - rInfo.getType())); + rInfo.getType())); unionoutRR.put(unionalias, field, lInfo); } @@ -6408,7 +6479,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (leftOp instanceof UnionOperator) { // make left a child of right List> child = - new ArrayList>(); + new ArrayList>(); child.add(leftOp); rightOp.setChildOperators(child); @@ -6422,7 +6493,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } else { // make right a child of left List> child = - new ArrayList>(); + new ArrayList>(); child.add(rightOp); leftOp.setChildOperators(child); @@ -6439,11 +6510,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // Create a new union operator Operator unionforward = OperatorFactory .getAndMakeChild(new UnionDesc(), new RowSchema(unionoutRR - .getColumnInfos())); + .getColumnInfos())); // set union operator as child of each of leftOp and rightOp List> child = - new ArrayList>(); + new ArrayList>(); child.add(unionforward); rightOp.setChildOperators(child); @@ -6452,7 +6523,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { leftOp.setChildOperators(child); List> parent = - new ArrayList>(); + new ArrayList>(); parent.add(leftOp); parent.add(rightOp); unionforward.setParentOperators(parent); @@ -6470,9 +6541,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { * expressions are provided on the TABLESAMPLE clause and the table has * clustering columns defined in it's metadata. The predicate created has the * following structure: - * + * * ((hash(expressions) & Integer.MAX_VALUE) % denominator) == numerator - * + * * @param ts * TABLESAMPLE clause information * @param bucketCols @@ -6576,10 +6647,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { * if the column is a skewed column, use ColumnInfo accordingly */ ColumnInfo colInfo = new ColumnInfo(fields.get(i).getFieldName(), - TypeInfoUtils.getTypeInfoFromObjectInspector(fields.get(i) - .getFieldObjectInspector()), alias, false); + TypeInfoUtils.getTypeInfoFromObjectInspector(fields.get(i) + .getFieldObjectInspector()), alias, false); colInfo.setSkewedCol((isSkewedCol(alias, qb, fields.get(i) - .getFieldName())) ? true : false); + .getFieldName())) ? true : false); rwsch.put(alias, fields.get(i).getFieldName(), colInfo); } } catch (SerDeException e) { @@ -6595,9 +6666,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { TypeInfoFactory.stringTypeInfo, alias, true)); } - //put all virutal columns in RowResolver. + // put all virutal columns in RowResolver. Iterator vcs = VirtualColumn.getRegistry(conf).iterator(); - //use a list for easy cumtomize + // use a list for easy cumtomize List vcList = new ArrayList(); while (vcs.hasNext()) { VirtualColumn vc = vcs.next(); @@ -6611,7 +6682,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { setupStats(tsDesc, qb.getParseInfo(), tab, alias, rwsch); top = putOpInsertMap(OperatorFactory.get(tsDesc, - new RowSchema(rwsch.getColumnInfos())), rwsch); + new RowSchema(rwsch.getColumnInfos())), rwsch); // Add this to the list of top operators - we always start from a table // scan @@ -6645,7 +6716,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (num > den) { throw new SemanticException( ErrorMsg.BUCKETED_NUMERATOR_BIGGER_DENOMINATOR.getMsg() + " " - + tab.getTableName()); + + tab.getTableName()); } // check if a predicate is needed @@ -6688,7 +6759,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { colsEqual, alias, rwsch, qb.getMetaData(), null); tableOp = OperatorFactory.getAndMakeChild(new FilterDesc( samplePredicate, true, new sampleDesc(ts.getNumerator(), ts - .getDenominator(), tabBucketCols, true)), + .getDenominator(), tabBucketCols, true)), new RowSchema(rwsch.getColumnInfos()), top); } else { // need to add filter @@ -6728,9 +6799,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { .getBucketCols(), true, alias, rwsch, qb.getMetaData(), null); tableOp = OperatorFactory .getAndMakeChild(new FilterDesc(samplePred, true, - new sampleDesc(tsSample.getNumerator(), tsSample - .getDenominator(), tab.getBucketCols(), true)), - new RowSchema(rwsch.getColumnInfos()), top); + new sampleDesc(tsSample.getNumerator(), tsSample + .getDenominator(), tab.getBucketCols(), true)), + new RowSchema(rwsch.getColumnInfos()), top); LOG.info("No need for sample filter"); } else { // The table is not bucketed, add a dummy filter :: rand() @@ -6741,7 +6812,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { LOG.info("Need sample filter"); ExprNodeDesc randFunc = TypeCheckProcFactory.DefaultExprProcessor .getFuncExprNodeDesc("rand", new ExprNodeConstantDesc(Integer - .valueOf(460476415))); + .valueOf(460476415))); ExprNodeDesc samplePred = genSamplePredicate(tsSample, null, false, alias, rwsch, qb.getMetaData(), randFunc); tableOp = OperatorFactory.getAndMakeChild(new FilterDesc( @@ -6764,7 +6835,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { private boolean isSkewedCol(String alias, QB qb, String colName) { boolean isSkewedCol = false; List skewedCols = qb.getSkewedColumnNames(alias); - for (String skewedCol:skewedCols) { + for (String skewedCol : skewedCols) { if (skewedCol.equalsIgnoreCase(colName)) { isSkewedCol = true; } @@ -6772,7 +6843,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { return isSkewedCol; } - private void setupStats(TableScanDesc tsDesc, QBParseInfo qbp, Table tab, String alias, RowResolver rwsch) + private void setupStats(TableScanDesc tsDesc, QBParseInfo qbp, Table tab, String alias, + RowResolver rwsch) throws SemanticException { if (!qbp.isAnalyzeCommand()) { @@ -6805,7 +6877,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // Theoretically the key prefix could be any unique string shared // between TableScanOperator (when publishing) and StatsTask (when aggregating). // Here we use - // table_name + partitionSec + // table_name + partitionSec // as the prefix for easy of read during explain and debugging. // Currently, partition spec can only be static partition. String k = tblName + Path.SEPARATOR; @@ -6905,7 +6977,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { /** * Generates the operator DAG needed to implement lateral views and attaches * it to the TS operator. - * + * * @param aliasToOpInfo * A mapping from a table alias to the TS operator. This function * replaces the operator mapping as necessary @@ -6936,7 +7008,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { RowResolver lvForwardRR = new RowResolver(); RowResolver source = opParseCtx.get(op).getRowResolver(); for (ColumnInfo col : source.getColumnInfos()) { - if(col.getIsVirtualCol() && col.isHiddenVirtualCol()) { + if (col.getIsVirtualCol() && col.isHiddenVirtualCol()) { continue; } String[] tabCol = source.reverseLookup(col.getInternalName()); @@ -6953,10 +7025,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // Get the all path by making a select(*). RowResolver allPathRR = opParseCtx.get(lvForward).getRowResolver(); - //Operator allPath = op; + // Operator allPath = op; Operator allPath = putOpInsertMap(OperatorFactory.getAndMakeChild( - new SelectDesc(true), new RowSchema(allPathRR.getColumnInfos()), - lvForward), allPathRR); + new SelectDesc(true), new RowSchema(allPathRR.getColumnInfos()), + lvForward), allPathRR); // Get the UDTF Path QB blankQb = new QB(null, null, false); Operator udtfPath = genSelectPlan((ASTNode) lateralViewTree @@ -6984,7 +7056,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // LVmerge.. in the above order Map colExprMap = new HashMap(); - int i=0; + int i = 0; for (ColumnInfo c : allPathRR.getColumnInfos()) { String internalName = getColumnInternalName(i); i++; @@ -7009,11 +7081,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { * A helper function that gets all the columns and respective aliases in the * source and puts them into dest. It renames the internal names of the * columns based on getColumnInternalName(position). - * + * * Note that this helper method relies on RowResolver.getColumnInfos() * returning the columns in the same order as they will be passed in the * operator DAG. - * + * * @param source * @param dest * @param outputColNames @@ -7081,12 +7153,14 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) { IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, getInputs(), conf); try { - List> indexUpdateTasks = indexUpdater.generateUpdateTasks(); + List> indexUpdateTasks = + indexUpdater.generateUpdateTasks(); for (Task updateTask : indexUpdateTasks) { tsk.addDependentTask(updateTask); } } catch (HiveException e) { - console.printInfo("WARNING: could not auto-update stale indexes, indexes are not in of sync"); + console + .printInfo("WARNING: could not auto-update stale indexes, indexes are not in of sync"); } } } @@ -7144,41 +7218,41 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // The dispatcher generates the plan from the operator tree Map opRules = new LinkedHashMap(); opRules.put(new RuleRegExp(new String("R1"), - TableScanOperator.getOperatorName() + "%"), - new GenMRTableScan1()); + TableScanOperator.getOperatorName() + "%"), + new GenMRTableScan1()); opRules.put(new RuleRegExp(new String("R2"), - TableScanOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"), - new GenMRRedSink1()); + TableScanOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"), + new GenMRRedSink1()); opRules.put(new RuleRegExp(new String("R3"), - ReduceSinkOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"), - new GenMRRedSink2()); + ReduceSinkOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"), + new GenMRRedSink2()); opRules.put(new RuleRegExp(new String("R4"), - FileSinkOperator.getOperatorName() + "%"), - new GenMRFileSink1()); + FileSinkOperator.getOperatorName() + "%"), + new GenMRFileSink1()); opRules.put(new RuleRegExp(new String("R5"), - UnionOperator.getOperatorName() + "%"), - new GenMRUnion1()); + UnionOperator.getOperatorName() + "%"), + new GenMRUnion1()); opRules.put(new RuleRegExp(new String("R6"), - UnionOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"), - new GenMRRedSink3()); + UnionOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"), + new GenMRRedSink3()); opRules.put(new RuleRegExp(new String("R6"), - MapJoinOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"), - new GenMRRedSink4()); + MapJoinOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"), + new GenMRRedSink4()); opRules.put(new RuleRegExp(new String("R7"), - TableScanOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName() + "%"), - MapJoinFactory.getTableScanMapJoin()); + TableScanOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName() + "%"), + MapJoinFactory.getTableScanMapJoin()); opRules.put(new RuleRegExp(new String("R8"), - ReduceSinkOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName() + "%"), - MapJoinFactory.getReduceSinkMapJoin()); + ReduceSinkOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName() + "%"), + MapJoinFactory.getReduceSinkMapJoin()); opRules.put(new RuleRegExp(new String("R9"), - UnionOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName() + "%"), - MapJoinFactory.getUnionMapJoin()); + UnionOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName() + "%"), + MapJoinFactory.getUnionMapJoin()); opRules.put(new RuleRegExp(new String("R10"), - MapJoinOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName() + "%"), - MapJoinFactory.getMapJoinMapJoin()); + MapJoinOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName() + "%"), + MapJoinFactory.getMapJoinMapJoin()); opRules.put(new RuleRegExp(new String("R11"), - MapJoinOperator.getOperatorName() + "%" + SelectOperator.getOperatorName() + "%"), - MapJoinFactory.getMapJoin()); + MapJoinOperator.getOperatorName() + "%" + SelectOperator.getOperatorName() + "%"), + MapJoinFactory.getMapJoin()); // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along @@ -7238,12 +7312,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { getLeafTasks(rootTasks, leaves); assert (leaves.size() > 0); for (Task task : leaves) { - if (task instanceof StatsTask){ - //StatsTask require table to already exist - for (Task parentOfStatsTask : task.getParentTasks()){ + if (task instanceof StatsTask) { + // StatsTask require table to already exist + for (Task parentOfStatsTask : task.getParentTasks()) { parentOfStatsTask.addDependentTask(crtTblTask); } - for (Task parentOfCrtTblTask : crtTblTask.getParentTasks()){ + for (Task parentOfCrtTblTask : crtTblTask.getParentTasks()) { parentOfCrtTblTask.removeDependentTask(task); } crtTblTask.addDependentTask(task); @@ -7254,9 +7328,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } if (globalLimitCtx.isEnable() && fetchTask != null) { - int fetchLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVELIMITOPTMAXFETCH); - LOG.info("set least row check for FetchTask: " + globalLimitCtx.getGlobalLimit()); - fetchTask.getWork().setLeastNumRows(globalLimitCtx.getGlobalLimit()); + HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVELIMITOPTMAXFETCH); + LOG.info("set least row check for FetchTask: " + globalLimitCtx.getGlobalLimit()); + fetchTask.getWork().setLeastNumRows(globalLimitCtx.getGlobalLimit()); } if (globalLimitCtx.isEnable() && globalLimitCtx.getLastReduceLimitDesc() != null) { @@ -7455,7 +7529,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } viewSelect = child; // prevent view from referencing itself - viewsExpanded.add(db.getCurrentDatabase()+"."+createVwDesc.getViewName()); + viewsExpanded.add(db.getCurrentDatabase() + "." + createVwDesc.getViewName()); } // continue analyzing from the child ASTNode. @@ -7470,7 +7544,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { LOG.info("Completed getting MetaData in Semantic Analysis"); // Save the result schema derived from the sink operator produced - // by genPlan. This has the correct column names, which clients + // by genPlan. This has the correct column names, which clients // such as JDBC would prefer instead of the c0, c1 we'll end // up with later. Operator sinkOp = genPlan(qb); @@ -7492,7 +7566,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { opToPartList, topOps, topSelOps, opParseCtx, joinContext, topToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, - opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToSkewedPruner); + opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToSkewedPruner, + groupbyNonMapSide2MapSide, groupbyMapSide2NonMapSide); Optimizer optm = new Optimizer(); optm.setPctx(pCtx); @@ -7527,8 +7602,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { int derivedColCount = derivedSchema.size(); if (explicitColCount != derivedColCount) { throw new SemanticException(generateErrorMessage( - viewSelect, - ErrorMsg.VIEW_COL_MISMATCH.getMsg())); + viewSelect, + ErrorMsg.VIEW_COL_MISMATCH.getMsg())); } } @@ -7576,19 +7651,19 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (createVwDesc.getPartColNames() != null) { // Make sure all partitioning columns referenced actually // exist and are in the correct order at the end - // of the list of columns produced by the view. Also move the field + // of the list of columns produced by the view. Also move the field // schema descriptors from derivedSchema to the partitioning key // descriptor. List partColNames = createVwDesc.getPartColNames(); if (partColNames.size() > derivedSchema.size()) { - throw new SemanticException( + throw new SemanticException( ErrorMsg.VIEW_PARTITION_MISMATCH.getMsg()); } // Get the partition columns from the end of derivedSchema. List partitionColumns = derivedSchema.subList( - derivedSchema.size() - partColNames.size(), - derivedSchema.size()); + derivedSchema.size() - partColNames.size(), + derivedSchema.size()); // Verify that the names match the PARTITIONED ON clause. Iterator colNameIter = partColNames.iterator(); @@ -7598,20 +7673,20 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { FieldSchema fieldSchema = schemaIter.next(); if (!fieldSchema.getName().equals(colName)) { throw new SemanticException( - ErrorMsg.VIEW_PARTITION_MISMATCH.getMsg()); + ErrorMsg.VIEW_PARTITION_MISMATCH.getMsg()); } } - // Boundary case: require at least one non-partitioned column + // Boundary case: require at least one non-partitioned column // for consistency with tables. if (partColNames.size() == derivedSchema.size()) { - throw new SemanticException( + throw new SemanticException( ErrorMsg.VIEW_PARTITION_TOTAL.getMsg()); } // Now make a copy. createVwDesc.setPartCols( - new ArrayList(partitionColumns)); + new ArrayList(partitionColumns)); // Finally, remove the partition columns from the end of derivedSchema. // (Clearing the subList writes through to the underlying @@ -7640,7 +7715,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { * Generates an expression node descriptor for the expression passed in the * arguments. This function uses the row resolver and the metadata information * that are passed as arguments to resolve the column names to internal names. - * + * * @param expr * The expression * @param input @@ -7660,7 +7735,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { * Generates an expression node descriptor for the expression passed in the * arguments. This function uses the row resolver and the metadata information * that are passed as arguments to resolve the column names to internal names. - * + * * @param expr * The expression * @param input @@ -7672,7 +7747,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { */ @SuppressWarnings("nls") public ExprNodeDesc genExprNodeDesc(ASTNode expr, RowResolver input, - TypeCheckCtx tcCtx) throws SemanticException { + TypeCheckCtx tcCtx) throws SemanticException { // We recursively create the exprNodeDesc. Base cases: when we encounter // a column ref, we convert that into an exprNodeColumnDesc; when we // encounter @@ -7692,11 +7767,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { .getIsVirtualCol(), colInfo.isSkewedCol()); } - // Create the walker and the rules dispatcher. + // Create the walker and the rules dispatcher. tcCtx.setUnparseTranslator(unparseTranslator); HashMap nodeOutputs = - TypeCheckProcFactory.genExprNode(expr, tcCtx); + TypeCheckProcFactory.genExprNode(expr, tcCtx); ExprNodeDesc desc = (ExprNodeDesc) nodeOutputs.get(expr); if (desc == null) { throw new SemanticException(tcCtx.getError()); @@ -7737,7 +7812,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { public void validate() throws SemanticException { LOG.debug("validation start"); // Validate inputs and outputs have right protectmode to execute the query - for (ReadEntity readEntity: getInputs()) { + for (ReadEntity readEntity : getInputs()) { ReadEntity.Type type = readEntity.getType(); if (type != ReadEntity.Type.TABLE && @@ -7754,22 +7829,22 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (tbl.isOffline()) { throw new SemanticException( ErrorMsg.OFFLINE_TABLE_OR_PARTITION.getMsg( - "Table " + tbl.getTableName())); + "Table " + tbl.getTableName())); } if (type == ReadEntity.Type.PARTITION && p != null && p.isOffline()) { throw new SemanticException( ErrorMsg.OFFLINE_TABLE_OR_PARTITION.getMsg( - "Table " + tbl.getTableName() + - " Partition " + p.getName())); + "Table " + tbl.getTableName() + + " Partition " + p.getName())); } } - for (WriteEntity writeEntity: getOutputs()) { + for (WriteEntity writeEntity : getOutputs()) { WriteEntity.Type type = writeEntity.getType(); - if(type == WriteEntity.Type.PARTITION || type == WriteEntity.Type.DUMMYPARTITION) { + if (type == WriteEntity.Type.PARTITION || type == WriteEntity.Type.DUMMYPARTITION) { String conflictingArchive; try { Partition usedp = writeEntity.getPartition(); @@ -7782,7 +7857,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } catch (HiveException e) { throw new SemanticException(e); } - if(conflictingArchive != null) { + if (conflictingArchive != null) { String message = String.format("Insert conflict with existing archive: %s", conflictingArchive); throw new SemanticException(message); @@ -7817,11 +7892,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { throw new SemanticException(e); } - if (type == WriteEntity.Type.PARTITION && p!=null && p.isOffline()) { + if (type == WriteEntity.Type.PARTITION && p != null && p.isOffline()) { throw new SemanticException( ErrorMsg.OFFLINE_TABLE_OR_PARTITION.getMsg( - " Table " + tbl.getTableName() + - " Partition " + p.getName())); + " Table " + tbl.getTableName() + + " Partition " + p.getName())); } } @@ -7833,11 +7908,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (tbl.isOffline()) { throw new SemanticException( ErrorMsg.OFFLINE_TABLE_OR_PARTITION.getMsg( - "Table " + tbl.getTableName())); + "Table " + tbl.getTableName())); } } - boolean reworkMapredWork = HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_REWORK_MAPREDWORK); + boolean reworkMapredWork = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_REWORK_MAPREDWORK); // validate all tasks for (Task rootTask : rootTasks) { @@ -7867,7 +7942,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { /** * Add default properties for table property. If a default parameter exists * in the tblProp, the value in tblProp will be kept. - * @param table property map + * + * @param table + * property map * @return Modified table property map */ private Map addDefaultProperties(Map tblProp) { @@ -7879,7 +7956,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } String paraString = HiveConf.getVar(conf, ConfVars.NEWTABLEDEFAULTPARA); if (paraString != null && !paraString.isEmpty()) { - for (String keyValuePair: paraString.split(",")) { + for (String keyValuePair : paraString.split(",")) { String[] keyValue = keyValuePair.split("=", 2); if (keyValue.length != 2) { continue; @@ -7902,7 +7979,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { */ private ASTNode analyzeCreateTable(ASTNode ast, QB qb) throws SemanticException { - String tableName = getUnescapedName((ASTNode)ast.getChild(0)); + String tableName = getUnescapedName((ASTNode) ast.getChild(0)); String likeTableName = null; List cols = new ArrayList(); List partCols = new ArrayList(); @@ -7921,7 +7998,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { int command_type = CREATE_TABLE; List skewedColNames = new ArrayList(); List> skewedValues = new ArrayList>(); - Map, String> listBucketColValuesMapping = new HashMap, String>(); + new HashMap, String>(); RowFormatParams rowFormatParams = new RowFormatParams(); StorageFormat storageFormat = new StorageFormat(); @@ -7951,7 +8028,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { break; case HiveParser.TOK_LIKETABLE: if (child.getChildCount() > 0) { - likeTableName = getUnescapedName((ASTNode)child.getChild(0)); + likeTableName = getUnescapedName((ASTNode) child.getChild(0)); if (likeTableName != null) { if (command_type == CTAS) { throw new SemanticException(ErrorMsg.CTAS_CTLT_COEXISTENCE @@ -8048,38 +8125,38 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } else { ASTNode vAstNode = (ASTNode) vNode; switch (vAstNode.getToken().getType()) { - case HiveParser.TOK_TABCOLVALUE: - for (String str : getSkewedColumnValuesFromASTNode(vAstNode)) { - List sList = new ArrayList(Arrays.asList(str)); - skewedValues.add(sList); - } - break; - case HiveParser.TOK_TABCOLVALUE_PAIR: - ArrayList vLNodes = vAstNode.getChildren(); - for (Node node : vLNodes) { - if ( ((ASTNode) node).getToken().getType() != HiveParser.TOK_TABCOLVALUES) { + case HiveParser.TOK_TABCOLVALUE: + for (String str : getSkewedColumnValuesFromASTNode(vAstNode)) { + List sList = new ArrayList(Arrays.asList(str)); + skewedValues.add(sList); + } + break; + case HiveParser.TOK_TABCOLVALUE_PAIR: + ArrayList vLNodes = vAstNode.getChildren(); + for (Node node : vLNodes) { + if (((ASTNode) node).getToken().getType() != HiveParser.TOK_TABCOLVALUES) { + throw new SemanticException( + ErrorMsg.CREATE_SKEWED_TABLE_NO_COLUMN_VALUE.getMsg()); + } else { + Tree leafVNode = ((ASTNode) node).getChild(0); + if (leafVNode == null) { throw new SemanticException( ErrorMsg.CREATE_SKEWED_TABLE_NO_COLUMN_VALUE.getMsg()); } else { - Tree leafVNode = ((ASTNode) node).getChild(0); - if (leafVNode == null) { + ASTNode lVAstNode = (ASTNode) leafVNode; + if (lVAstNode.getToken().getType() != HiveParser.TOK_TABCOLVALUE) { throw new SemanticException( ErrorMsg.CREATE_SKEWED_TABLE_NO_COLUMN_VALUE.getMsg()); } else { - ASTNode lVAstNode = (ASTNode) leafVNode; - if (lVAstNode.getToken().getType() != HiveParser.TOK_TABCOLVALUE) { - throw new SemanticException( - ErrorMsg.CREATE_SKEWED_TABLE_NO_COLUMN_VALUE.getMsg()); - } else { - skewedValues.add(new ArrayList( - getSkewedColumnValuesFromASTNode(lVAstNode))); - } + skewedValues.add(new ArrayList( + getSkewedColumnValuesFromASTNode(lVAstNode))); } } } - break; - default: - break; + } + break; + default: + break; } } break; @@ -8113,12 +8190,16 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { case CREATE_TABLE: // REGULAR CREATE TABLE DDL tblProps = addDefaultProperties(tblProps); - crtTblDesc = new CreateTableDesc(tableName, isExt, cols, partCols, - bucketCols, sortCols, numBuckets, rowFormatParams.fieldDelim, rowFormatParams.fieldEscape, - rowFormatParams.collItemDelim, rowFormatParams.mapKeyDelim, rowFormatParams.lineDelim, comment, - storageFormat.inputFormat, storageFormat.outputFormat, location, shared.serde, - storageFormat.storageHandler, shared.serdeProps, tblProps, ifNotExists, skewedColNames, - skewedValues); + crtTblDesc = + new CreateTableDesc(tableName, isExt, cols, partCols, + bucketCols, sortCols, numBuckets, rowFormatParams.fieldDelim, + rowFormatParams.fieldEscape, + rowFormatParams.collItemDelim, rowFormatParams.mapKeyDelim, + rowFormatParams.lineDelim, comment, + storageFormat.inputFormat, storageFormat.outputFormat, location, shared.serde, + storageFormat.storageHandler, shared.serdeProps, tblProps, ifNotExists, + skewedColNames, + skewedValues); crtTblDesc.validate(); // outputs is empty, which means this create table happens in the current @@ -8144,7 +8225,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { try { Table dumpTable = db.newTable(tableName); databaseName = dumpTable.getDbName(); - if (null == db.getDatabase(dumpTable.getDbName()) ) { + if (null == db.getDatabase(dumpTable.getDbName())) { throw new SemanticException(ErrorMsg.DATABASE_NOT_EXISTS.getMsg(dumpTable.getDbName())); } if (null != db.getTable(dumpTable.getDbName(), dumpTable.getTableName(), false)) { @@ -8156,11 +8237,15 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { tblProps = addDefaultProperties(tblProps); - crtTblDesc = new CreateTableDesc(databaseName, tableName, isExt, cols, partCols, - bucketCols, sortCols, numBuckets, rowFormatParams.fieldDelim, rowFormatParams.fieldEscape, - rowFormatParams.collItemDelim, rowFormatParams.mapKeyDelim, rowFormatParams.lineDelim, comment, storageFormat.inputFormat, - storageFormat.outputFormat, location, shared.serde, storageFormat.storageHandler, shared.serdeProps, - tblProps, ifNotExists, skewedColNames, skewedValues); + crtTblDesc = + new CreateTableDesc(databaseName, tableName, isExt, cols, partCols, + bucketCols, sortCols, numBuckets, rowFormatParams.fieldDelim, + rowFormatParams.fieldEscape, + rowFormatParams.collItemDelim, rowFormatParams.mapKeyDelim, + rowFormatParams.lineDelim, comment, storageFormat.inputFormat, + storageFormat.outputFormat, location, shared.serde, storageFormat.storageHandler, + shared.serdeProps, + tblProps, ifNotExists, skewedColNames, skewedValues); qb.setTableDesc(crtTblDesc); SessionState.get().setCommandType(HiveOperation.CREATETABLE_AS_SELECT); @@ -8174,7 +8259,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { /** * Analyze list bucket column names - * + * * @param skewedColNames * @param child * @return @@ -8198,10 +8283,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { /** * Given a ASTNode, return list of values. - * + * * use case: - * create table xyz list bucketed (col1) with skew (1,2,5) - * AST Node is for (1,2,5) + * create table xyz list bucketed (col1) with skew (1,2,5) + * AST Node is for (1,2,5) + * * @param ast * @return */ @@ -8218,7 +8304,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { private ASTNode analyzeCreateView(ASTNode ast, QB qb) throws SemanticException { - String tableName = getUnescapedName((ASTNode)ast.getChild(0)); + String tableName = getUnescapedName((ASTNode) ast.getChild(0)); List cols = null; boolean ifNotExists = false; boolean orReplace = false; @@ -8259,12 +8345,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } } - if (ifNotExists && orReplace){ + if (ifNotExists && orReplace) { throw new SemanticException("Can't combine IF NOT EXISTS and OR REPLACE."); } createVwDesc = new CreateViewDesc( - tableName, cols, comment, tblProps, partColNames, ifNotExists, orReplace); + tableName, cols, comment, tblProps, partColNames, ifNotExists, orReplace); unparseTranslator.enable(); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), createVwDesc), conf)); @@ -8274,7 +8360,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { private void decideExecMode(List> rootTasks, Context ctx, GlobalLimitCtx globalLimitCtx) - throws SemanticException { + throws SemanticException { // bypass for explain queries for now if (ctx.getExplain()) { @@ -8288,20 +8374,21 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } final Context lCtx = ctx; - PathFilter p = new PathFilter () { - public boolean accept(Path file) { - return !lCtx.isMRTmpFileURI(file.toUri().getPath()); - } - }; + PathFilter p = new PathFilter() { + @Override + public boolean accept(Path file) { + return !lCtx.isMRTmpFileURI(file.toUri().getPath()); + } + }; List mrtasks = Utilities.getMRTasks(rootTasks); // map-reduce jobs will be run locally based on data size // first find out if any of the jobs needs to run non-locally boolean hasNonLocalJob = false; - for (ExecDriver mrtask: mrtasks) { + for (ExecDriver mrtask : mrtasks) { try { ContentSummary inputSummary = Utilities.getInputSummary - (ctx, (MapredWork)mrtask.getWork(), p); + (ctx, mrtask.getWork(), p); int numReducers = getNumberOfReducers(mrtask.getWork(), conf); long estimatedInput; @@ -8324,8 +8411,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (LOG.isDebugEnabled()) { LOG.debug("Task: " + mrtask.getId() + ", Summary: " + - inputSummary.getLength() + "," + inputSummary.getFileCount() + "," - + numReducers + ", estimated Input: " + estimatedInput); + inputSummary.getLength() + "," + inputSummary.getFileCount() + "," + + numReducers + ", estimated Input: " + estimatedInput); } if (MapRedTask.isEligibleForLocalMode(conf, numReducers, @@ -8336,15 +8423,15 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { mrtask.setLocalMode(true); } } catch (IOException e) { - throw new SemanticException (e); + throw new SemanticException(e); } } - if(!hasNonLocalJob) { + if (!hasNonLocalJob) { // Entire query can be run locally. // Save the current tracker value and restore it when done. ctx.setOriginalTracker(ShimLoader.getHadoopShims().getJobLauncherRpcAddress(conf)); - ShimLoader.getHadoopShims().setJobLauncherRpcAddress(conf,"local"); + ShimLoader.getHadoopShims().setJobLauncherRpcAddress(conf, "local"); console.printInfo("Automatically selecting local only mode for query"); // If all the tasks can be run locally, we can use local disk for diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/BaseReduceSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/BaseReduceSinkDesc.java new file mode 100644 index 0000000..2901b23 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/BaseReduceSinkDesc.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.util.List; + +/** + * BaseReduceSinkDesc. + * + */ +@Explain(displayName = "Base Reduce Output Operator") +public class BaseReduceSinkDesc extends AbstractOperatorDesc { + private static final long serialVersionUID = 1L; + /** + * Key columns are passed to reducer in the "key". + */ + protected java.util.ArrayList keyCols; + protected java.util.ArrayList outputKeyColumnNames; + protected List> distinctColumnIndices; + /** + * Value columns are passed to reducer in the "value". + */ + protected java.util.ArrayList valueCols; + protected java.util.ArrayList outputValueColumnNames; + /** + * Describe how to serialize the key. + */ + protected TableDesc keySerializeInfo; + /** + * Describe how to serialize the value. + */ + protected TableDesc valueSerializeInfo; + + /** + * The tag for this reducesink descriptor. + */ + protected int tag; + + /** + * Number of distribution keys. + */ + protected int numDistributionKeys; + + /** + * The partition columns (CLUSTER BY or DISTRIBUTE BY in Hive language). + * Partition columns decide the reducer that the current row goes to. + * Partition columns are not passed to reducer. + */ + protected java.util.ArrayList partitionCols; + + protected int numReducers; + + public BaseReduceSinkDesc() { + } + + public java.util.ArrayList getOutputKeyColumnNames() { + return outputKeyColumnNames; + } + + public void setOutputKeyColumnNames( + java.util.ArrayList outputKeyColumnNames) { + this.outputKeyColumnNames = outputKeyColumnNames; + } + + public java.util.ArrayList getOutputValueColumnNames() { + return outputValueColumnNames; + } + + public void setOutputValueColumnNames( + java.util.ArrayList outputValueColumnNames) { + this.outputValueColumnNames = outputValueColumnNames; + } + + @Explain(displayName = "key expressions") + public java.util.ArrayList getKeyCols() { + return keyCols; + } + + public void setKeyCols(final java.util.ArrayList keyCols) { + this.keyCols = keyCols; + } + + public int getNumDistributionKeys() { + return this.numDistributionKeys; + } + + public void setNumDistributionKeys(int numKeys) { + this.numDistributionKeys = numKeys; + } + + @Explain(displayName = "value expressions") + public java.util.ArrayList getValueCols() { + return valueCols; + } + + public void setValueCols(final java.util.ArrayList valueCols) { + this.valueCols = valueCols; + } + + @Explain(displayName = "Map-reduce partition columns") + public java.util.ArrayList getPartitionCols() { + return partitionCols; + } + + public void setPartitionCols( + final java.util.ArrayList partitionCols) { + this.partitionCols = partitionCols; + } + + @Explain(displayName = "tag") + public int getTag() { + return tag; + } + + public void setTag(int tag) { + this.tag = tag; + } + + /** + * Returns the number of reducers for the map-reduce job. -1 means to decide + * the number of reducers at runtime. This enables Hive to estimate the number + * of reducers based on the map-reduce input data size, which is only + * available right before we start the map-reduce job. + */ + public int getNumReducers() { + return numReducers; + } + + public void setNumReducers(int numReducers) { + this.numReducers = numReducers; + } + + public TableDesc getKeySerializeInfo() { + return keySerializeInfo; + } + + public void setKeySerializeInfo(TableDesc keySerializeInfo) { + this.keySerializeInfo = keySerializeInfo; + } + + public TableDesc getValueSerializeInfo() { + return valueSerializeInfo; + } + + public void setValueSerializeInfo(TableDesc valueSerializeInfo) { + this.valueSerializeInfo = valueSerializeInfo; + } + + /** + * Returns the sort order of the key columns. + * + * @return null, which means ascending order for all key columns, or a String + * of the same length as key columns, that consists of only "+" + * (ascending order) and "-" (descending order). + */ + @Explain(displayName = "sort order") + public String getOrder() { + return keySerializeInfo.getProperties().getProperty( + org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER); + } + + public void setOrder(String orderStr) { + keySerializeInfo.getProperties().setProperty( + org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER, + orderStr); + } + + public List> getDistinctColumnIndices() { + return distinctColumnIndices; + } + + public void setDistinctColumnIndices( + List> distinctColumnIndices) { + this.distinctColumnIndices = distinctColumnIndices; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java new file mode 100644 index 0000000..f1793be --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; + + +/** + * Correlation composite operator Descriptor implementation. + * + */ +@Explain(displayName = "Correlation Composite Operator") +public class CorrelationCompositeDesc extends AbstractOperatorDesc { + + private static final long serialVersionUID = 1L; + + private ReduceSinkOperator correspondingReduceSinkOperator; + + public CorrelationCompositeDesc() { + + } + + public CorrelationCompositeDesc(ReduceSinkOperator correspondingReduceSinkOperator) { + this.correspondingReduceSinkOperator = correspondingReduceSinkOperator; + } + + public void setCorrespondingReduceSinkOperator( + ReduceSinkOperator correspondingReduceSinkOperator) { + this.correspondingReduceSinkOperator = correspondingReduceSinkOperator; + } + + public ReduceSinkOperator getCorrespondingReduceSinkOperator() { + return correspondingReduceSinkOperator; + } + + private int[] allOperationPathTags; + + public void setAllOperationPathTags(int[] allOperationPathTags) { + this.allOperationPathTags = allOperationPathTags; + } + + public int[] getAllOperationPathTags() { + return allOperationPathTags; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationLocalSimulativeReduceSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationLocalSimulativeReduceSinkDesc.java new file mode 100644 index 0000000..317be82 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationLocalSimulativeReduceSinkDesc.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.util.List; + +/** + * CorrelationLocalSimulativeReduceSinkDesc. + * + */ +@Explain(displayName = "Correlation Local Simulative Reduce Output Operator") +public class CorrelationLocalSimulativeReduceSinkDesc extends BaseReduceSinkDesc { + private static final long serialVersionUID = 1L; + + public CorrelationLocalSimulativeReduceSinkDesc() { + } + + public CorrelationLocalSimulativeReduceSinkDesc(java.util.ArrayList keyCols, + int numDistributionKeys, + java.util.ArrayList valueCols, + java.util.ArrayList outputKeyColumnNames, + List> distinctColumnIndices, + java.util.ArrayList outputValueColumnNames, int tag, + java.util.ArrayList partitionCols, int numReducers, + final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) { + this.keyCols = keyCols; + this.numDistributionKeys = numDistributionKeys; + this.valueCols = valueCols; + this.outputKeyColumnNames = outputKeyColumnNames; + this.outputValueColumnNames = outputValueColumnNames; + this.tag = tag; + this.numReducers = numReducers; + this.partitionCols = partitionCols; + this.keySerializeInfo = keySerializeInfo; + this.valueSerializeInfo = valueSerializeInfo; + this.distinctColumnIndices = distinctColumnIndices; + } + + public CorrelationLocalSimulativeReduceSinkDesc(ReduceSinkDesc reduceSinkDesc){ + this.keyCols = reduceSinkDesc.getKeyCols(); + this.numDistributionKeys = reduceSinkDesc.getNumDistributionKeys(); + this.valueCols = reduceSinkDesc.getValueCols(); + this.outputKeyColumnNames = reduceSinkDesc.getOutputKeyColumnNames(); + this.outputValueColumnNames = reduceSinkDesc.getOutputValueColumnNames(); + this.tag = reduceSinkDesc.getTag(); + this.numReducers = reduceSinkDesc.getNumReducers(); + this.partitionCols = reduceSinkDesc.getPartitionCols(); + this.keySerializeInfo = reduceSinkDesc.getKeySerializeInfo(); + this.valueSerializeInfo = reduceSinkDesc.getValueSerializeInfo(); + this.distinctColumnIndices = reduceSinkDesc.getDistinctColumnIndices(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationReducerDispatchDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationReducerDispatchDesc.java new file mode 100644 index 0000000..a007254 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationReducerDispatchDesc.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map.Entry; + + +/** + * Correlation dispatch operator Descriptor implementation. + * + */ +@Explain(displayName = "Correlation Dispatch Operator") +public class CorrelationReducerDispatchDesc extends AbstractOperatorDesc { + + private static final long serialVersionUID = 1L; + + + private HashMap>> dispatchConf; + private HashMap>> dispatchValueSelectDescConf; + private HashMap>> dispatchKeySelectDescConf; + + public CorrelationReducerDispatchDesc(){ + this.dispatchConf = new HashMap>>(); + this.dispatchValueSelectDescConf = new HashMap>>(); + this.dispatchKeySelectDescConf = new HashMap>>(); + + } + + public CorrelationReducerDispatchDesc(HashMap>> dispatchConf){ + this.dispatchConf = dispatchConf; + this.dispatchValueSelectDescConf = new HashMap>>(); + this.dispatchKeySelectDescConf = new HashMap>>(); + for(Entry>> entry: this.dispatchConf.entrySet()){ + HashMap> tmp = new HashMap>(); + for(Integer child: entry.getValue().keySet()){ + tmp.put(child, new ArrayList()); + tmp.get(child).add(new SelectDesc(true)); + } + this.dispatchValueSelectDescConf.put(entry.getKey(), tmp); + this.dispatchKeySelectDescConf.put(entry.getKey(), tmp); + } + } + + public CorrelationReducerDispatchDesc(HashMap>> dispatchConf, + HashMap>> dispatchKeySelectDescConf, + HashMap>> dispatchValueSelectDescConf){ + this.dispatchConf = dispatchConf; + this.dispatchValueSelectDescConf = dispatchValueSelectDescConf; + this.dispatchKeySelectDescConf = dispatchKeySelectDescConf; + } + + public void setDispatchConf(HashMap>> dispatchConf){ + this.dispatchConf = dispatchConf; + } + + public HashMap>> getDispatchConf(){ + return this.dispatchConf; + } + + public void setDispatchValueSelectDescConf(HashMap>> dispatchValueSelectDescConf){ + this.dispatchValueSelectDescConf = dispatchValueSelectDescConf; + } + + public HashMap>> getDispatchValueSelectDescConf(){ + return this.dispatchValueSelectDescConf; + } + + public void setDispatchKeySelectDescConf(HashMap>> dispatchKeySelectDescConf){ + this.dispatchKeySelectDescConf = dispatchKeySelectDescConf; + } + + public HashMap>> getDispatchKeySelectDescConf() { + return this.dispatchKeySelectDescConf; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java index 5f38bf2..7c40499 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java @@ -71,6 +71,7 @@ public class MapredWork extends AbstractOperatorDesc { private Long minSplitSizePerRack; private boolean needsTagging; + private boolean needsOperationPathTagging; private boolean hadoopSupportsSplittable; private MapredLocalWork mapLocalWork; @@ -339,6 +340,16 @@ public class MapredWork extends AbstractOperatorDesc { this.needsTagging = needsTagging; } + //TODO: enable the annotation shown below + // @Explain(displayName = "Needs Operation Paths Tagging", normalExplain = false) + public boolean getNeedsOperationPathTagging() { + return needsOperationPathTagging; + } + + public void setNeedsOperationPathTagging(boolean needsOperationPathTagging) { + this.needsOperationPathTagging = needsOperationPathTagging; + } + public boolean getHadoopSupportsSplittable() { return hadoopSupportsSplittable; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index 16eb125..f8b572b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -27,58 +27,44 @@ import java.util.List; * */ @Explain(displayName = "Reduce Output Operator") -public class ReduceSinkDesc extends AbstractOperatorDesc { +public class ReduceSinkDesc extends BaseReduceSinkDesc { private static final long serialVersionUID = 1L; - /** - * Key columns are passed to reducer in the "key". - */ - private java.util.ArrayList keyCols; - private java.util.ArrayList outputKeyColumnNames; - private List> distinctColumnIndices; - /** - * Value columns are passed to reducer in the "value". - */ - private java.util.ArrayList valueCols; - private java.util.ArrayList outputValueColumnNames; - /** - * Describe how to serialize the key. - */ - private TableDesc keySerializeInfo; - /** - * Describe how to serialize the value. - */ - private TableDesc valueSerializeInfo; - /** - * The tag for this reducesink descriptor. - */ - private int tag; + private boolean needsOperationPathTagging; - /** - * Number of distribution keys. - */ - private int numDistributionKeys; - - /** - * The partition columns (CLUSTER BY or DISTRIBUTE BY in Hive language). - * Partition columns decide the reducer that the current row goes to. - * Partition columns are not passed to reducer. - */ - private java.util.ArrayList partitionCols; + public boolean getNeedsOperationPathTagging() { + return needsOperationPathTagging; + } - private int numReducers; + public void setNeedsOperationPathTagging(boolean isOperationPathTagged) { + this.needsOperationPathTagging = isOperationPathTagged; + } public ReduceSinkDesc() { } public ReduceSinkDesc(java.util.ArrayList keyCols, + int numDistributionKeys, + java.util.ArrayList valueCols, + java.util.ArrayList outputKeyColumnNames, + List> distinctColumnIndices, + java.util.ArrayList outputValueColumnNames, int tag, + java.util.ArrayList partitionCols, int numReducers, + final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) { + this(keyCols, numDistributionKeys, valueCols, + outputKeyColumnNames, distinctColumnIndices, outputValueColumnNames, tag, + partitionCols, numReducers, keySerializeInfo, valueSerializeInfo, false); + } + + public ReduceSinkDesc(java.util.ArrayList keyCols, int numDistributionKeys, java.util.ArrayList valueCols, java.util.ArrayList outputKeyColumnNames, List> distinctColumnIndices, java.util.ArrayList outputValueColumnNames, int tag, java.util.ArrayList partitionCols, int numReducers, - final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) { + final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo, + boolean needsOperationPathTagging) { this.keyCols = keyCols; this.numDistributionKeys = numDistributionKeys; this.valueCols = valueCols; @@ -90,6 +76,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { this.keySerializeInfo = keySerializeInfo; this.valueSerializeInfo = valueSerializeInfo; this.distinctColumnIndices = distinctColumnIndices; + this.needsOperationPathTagging = needsOperationPathTagging; } @Override @@ -112,127 +99,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { desc.setPartitionCols((ArrayList) getPartitionCols().clone()); desc.setKeySerializeInfo((TableDesc) getKeySerializeInfo().clone()); desc.setValueSerializeInfo((TableDesc) getValueSerializeInfo().clone()); + desc.setNeedsOperationPathTagging(needsOperationPathTagging); return desc; } - - public java.util.ArrayList getOutputKeyColumnNames() { - return outputKeyColumnNames; - } - - public void setOutputKeyColumnNames( - java.util.ArrayList outputKeyColumnNames) { - this.outputKeyColumnNames = outputKeyColumnNames; - } - - public java.util.ArrayList getOutputValueColumnNames() { - return outputValueColumnNames; - } - - public void setOutputValueColumnNames( - java.util.ArrayList outputValueColumnNames) { - this.outputValueColumnNames = outputValueColumnNames; - } - - @Explain(displayName = "key expressions") - public java.util.ArrayList getKeyCols() { - return keyCols; - } - - public void setKeyCols(final java.util.ArrayList keyCols) { - this.keyCols = keyCols; - } - - public int getNumDistributionKeys() { - return this.numDistributionKeys; - } - - public void setNumDistributionKeys(int numKeys) { - this.numDistributionKeys = numKeys; - } - - @Explain(displayName = "value expressions") - public java.util.ArrayList getValueCols() { - return valueCols; - } - - public void setValueCols(final java.util.ArrayList valueCols) { - this.valueCols = valueCols; - } - - @Explain(displayName = "Map-reduce partition columns") - public java.util.ArrayList getPartitionCols() { - return partitionCols; - } - - public void setPartitionCols( - final java.util.ArrayList partitionCols) { - this.partitionCols = partitionCols; - } - - @Explain(displayName = "tag") - public int getTag() { - return tag; - } - - public void setTag(int tag) { - this.tag = tag; - } - - /** - * Returns the number of reducers for the map-reduce job. -1 means to decide - * the number of reducers at runtime. This enables Hive to estimate the number - * of reducers based on the map-reduce input data size, which is only - * available right before we start the map-reduce job. - */ - public int getNumReducers() { - return numReducers; - } - - public void setNumReducers(int numReducers) { - this.numReducers = numReducers; - } - - public TableDesc getKeySerializeInfo() { - return keySerializeInfo; - } - - public void setKeySerializeInfo(TableDesc keySerializeInfo) { - this.keySerializeInfo = keySerializeInfo; - } - - public TableDesc getValueSerializeInfo() { - return valueSerializeInfo; - } - - public void setValueSerializeInfo(TableDesc valueSerializeInfo) { - this.valueSerializeInfo = valueSerializeInfo; - } - - /** - * Returns the sort order of the key columns. - * - * @return null, which means ascending order for all key columns, or a String - * of the same length as key columns, that consists of only "+" - * (ascending order) and "-" (descending order). - */ - @Explain(displayName = "sort order") - public String getOrder() { - return keySerializeInfo.getProperties().getProperty( - org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER); - } - - public void setOrder(String orderStr) { - keySerializeInfo.getProperties().setProperty( - org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER, - orderStr); - } - - public List> getDistinctColumnIndices() { - return distinctColumnIndices; - } - - public void setDistinctColumnIndices( - List> distinctColumnIndices) { - this.distinctColumnIndices = distinctColumnIndices; - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index 9a95efd..449125c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -50,6 +50,8 @@ public class TableScanDesc extends AbstractOperatorDesc { private boolean gatherStats; private boolean statsReliable; + private boolean forwardRowNumber = false; + private ExprNodeDesc filterExpr; public static final String FILTER_EXPR_CONF_STR = @@ -103,6 +105,14 @@ public class TableScanDesc extends AbstractOperatorDesc { return partColumns; } + public boolean isForwardRowNumber() { + return forwardRowNumber; + } + + public void setForwardRowNumber(boolean forwardRowNumber) { + this.forwardRowNumber = forwardRowNumber; + } + public void setGatherStats(boolean gatherStats) { this.gatherStats = gatherStats; } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java index 142f040..ff54beb 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java @@ -277,6 +277,7 @@ public class TestExecDriver extends TestCase { private void populateMapRedPlan3(Table src, Table src2) throws SemanticException { mr.setNumReduceTasks(Integer.valueOf(5)); mr.setNeedsTagging(true); + mr.setNeedsOperationPathTagging(false); ArrayList outputColumns = new ArrayList(); for (int i = 0; i < 2; i++) { outputColumns.add("_col" + i);