### Eclipse Workspace Patch 1.0 #P hive-trunk-HIVE-2206-startAt20111114 Index: ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (revision 1205559) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (working copy) @@ -23,6 +23,10 @@ import java.util.List; import org.apache.hadoop.hive.ql.plan.CollectDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationFakeReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationManualForwardDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc; import org.apache.hadoop.hive.ql.plan.ExtractDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc; @@ -91,6 +95,14 @@ HashTableDummyOperator.class)); opvec.add(new OpTuple(HashTableSinkDesc.class, HashTableSinkOperator.class)); + opvec.add(new OpTuple(CorrelationCompositeDesc.class, + CorrelationCompositeOperator.class)); + opvec.add(new OpTuple(CorrelationManualForwardDesc.class, + CorrelationManualForwardOperator.class)); + opvec.add(new OpTuple(CorrelationReducerDispatchDesc.class, + CorrelationReducerDispatchOperator.class)); + opvec.add(new OpTuple(CorrelationFakeReduceSinkDesc.class, + CorrelationFakeReduceSinkOperator.class)); } public static Operator get(Class opClass) { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorOld.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorOld.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorOld.java (revision 0) @@ -0,0 +1,1370 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.Explain; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; + +/** + * Base operator implementation. + **/ +public abstract class OperatorOld implements Serializable, + Node { + + // Bean methods + + private static final long serialVersionUID = 1L; + + protected List> childOperators; + protected List> parentOperators; + protected int numberOfParentOperators; + protected String operatorId; + /** + * List of counter names associated with the operator. It contains the + * following default counters NUM_INPUT_ROWS NUM_OUTPUT_ROWS TIME_TAKEN + * Individual operators can add to this list via addToCounterNames methods. + */ + protected ArrayList counterNames; + + /** + * Each operator has its own map of its counter names to disjoint + * ProgressCounter - it is populated at compile time and is read in at + * run-time while extracting the operator specific counts. + */ + protected HashMap counterNameToEnum; + + private transient ExecMapperContext execContext; + + private static int seqId; + + // It can be optimized later so that an operator operator (init/close) is performed + // only after that operation has been performed on all the parents. This will require + // initializing the whole tree in all the mappers (which might be required for mappers + // spanning multiple files anyway, in future) + /** + * State. + * + */ + public static enum State { + UNINIT, // initialize() has not been called + INIT, // initialize() has been called and close() has not been called, + // or close() has been called but one of its parent is not closed. + CLOSE + // all its parents operators are in state CLOSE and called close() + // to children. Note: close() being called and its state being CLOSE is + // difference since close() could be called but state is not CLOSE if + // one of its parent is not in state CLOSE.. + }; + + protected transient State state = State.UNINIT; + + static transient boolean fatalError = false; // fatalError is shared acorss + // all operators + + static { + seqId = 0; + } + + public OperatorOld() { + id = String.valueOf(seqId++); + } + + public static void resetId() { + seqId = 0; + } + + /** + * Create an operator with a reporter. + * + * @param reporter + * Used to report progress of certain operators. + */ + public OperatorOld(Reporter reporter) { + this.reporter = reporter; + id = String.valueOf(seqId++); + } + + public void setChildOperators( + List> childOperators) { + this.childOperators = childOperators; + } + + public List> getChildOperators() { + return childOperators; + } + + /** + * Implements the getChildren function for the Node Interface. + */ + public ArrayList getChildren() { + + if (getChildOperators() == null) { + return null; + } + + ArrayList ret_vec = new ArrayList(); + for (OperatorOld op : getChildOperators()) { + ret_vec.add(op); + } + + return ret_vec; + } + + public void setParentOperators( + List> parentOperators) { + this.parentOperators = parentOperators; + } + + public List> getParentOperators() { + return parentOperators; + } + + protected T conf; + protected boolean done; + + public void setConf(T conf) { + this.conf = conf; + } + + @Explain + public T getConf() { + return conf; + } + + public boolean getDone() { + return done || fatalError; + } + + public void setDone(boolean done) { + this.done = done; + } + + // non-bean fields needed during compilation + private transient RowSchema rowSchema; + + public void setSchema(RowSchema rowSchema) { + this.rowSchema = rowSchema; + } + + public RowSchema getSchema() { + return rowSchema; + } + + // non-bean .. + + protected transient HashMap, LongWritable> statsMap = new HashMap, LongWritable>(); + protected transient OutputCollector out; + protected transient Log LOG = LogFactory.getLog(this.getClass().getName()); + protected transient boolean isLogInfoEnabled = LOG.isInfoEnabled(); + protected transient String alias; + protected transient Reporter reporter; + protected transient String id; + // object inspectors for input rows + // We will increase the size of the array on demand + protected transient ObjectInspector[] inputObjInspectors = new ObjectInspector[1]; + // for output rows of this operator + protected transient ObjectInspector outputObjInspector; + + /** + * A map of output column name to input expression map. This is used by + * optimizer and built during semantic analysis contains only key elements for + * reduce sink and group by op + */ + protected transient Map colExprMap; + + public void setId(String id) { + this.id = id; + } + + /** + * This function is not named getId(), to make sure java serialization does + * NOT serialize it. Some TestParse tests will fail if we serialize this + * field, since the Operator ID will change based on the number of query + * tests. + */ + public String getIdentifier() { + return id; + } + + public void setReporter(Reporter rep) { + reporter = rep; + + // the collector is same across all operators + if (childOperators == null) { + return; + } + + for (OperatorOld op : childOperators) { + op.setReporter(rep); + } + } + + public void setOutputCollector(OutputCollector out) { + this.out = out; + + // the collector is same across all operators + if (childOperators == null) { + return; + } + + for (OperatorOld op : childOperators) { + op.setOutputCollector(out); + } + } + + /** + * Store the alias this operator is working on behalf of. + */ + public void setAlias(String alias) { + this.alias = alias; + + if (childOperators == null) { + return; + } + + for (OperatorOld op : childOperators) { + op.setAlias(alias); + } + } + + public Map, Long> getStats() { + HashMap, Long> ret = new HashMap, Long>(); + for (Enum one : statsMap.keySet()) { + ret.put(one, Long.valueOf(statsMap.get(one).get())); + } + return (ret); + } + + /** + * checks whether all parent operators are initialized or not. + * + * @return true if there are no parents or all parents are initialized. false + * otherwise + */ + protected boolean areAllParentsInitialized() { + if (parentOperators == null) { + return true; + } + for (OperatorOld parent : parentOperators) { + if (parent == null) { + //return true; + continue; + } + if (parent.state != State.INIT) { + return false; + } + } + return true; + } + + /** + * Initializes operators only if all parents have been initialized. Calls + * operator specific initializer which then initializes child ops. + * + * @param hconf + * @param inputOIs + * input object inspector array indexes by tag id. null value is + * ignored. + * @throws HiveException + */ + public void initialize(Configuration hconf, ObjectInspector[] inputOIs) + throws HiveException { + if (state == State.INIT) { + return; + } + + this.out = null; + if (!areAllParentsInitialized()) { + return; + } + + LOG.info("Initializing Self " + id + " " + getName()); + + if (inputOIs != null) { + inputObjInspectors = inputOIs; + } + + // initialize structure to maintain child op info. operator tree changes + // while + // initializing so this need to be done here instead of initialize() method + if (childOperators != null) { + childOperatorsArray = new OperatorOld[childOperators.size()]; + for (int i = 0; i < childOperatorsArray.length; i++) { + childOperatorsArray[i] = childOperators.get(i); + } + childOperatorsTag = new int[childOperatorsArray.length]; + for (int i = 0; i < childOperatorsArray.length; i++) { + List> parentOperators = childOperatorsArray[i] + .getParentOperators(); + if (parentOperators == null) { + throw new HiveException("Hive internal error: parent is null in " + + childOperatorsArray[i].getClass() + "!"); + } + childOperatorsTag[i] = parentOperators.indexOf(this); + if (childOperatorsTag[i] == -1) { + throw new HiveException( + "Hive internal error: cannot find parent in the child operator!"); + } + } + } + + if(parentOperators == null){ + numberOfParentOperators = 0; + }else{ + numberOfParentOperators = parentOperators.size(); + } + + if (inputObjInspectors.length == 0) { + throw new HiveException("Internal Error during operator initialization."); + } + // derived classes can set this to different object if needed + outputObjInspector = inputObjInspectors[0]; + + //pass the exec context to child operators + passExecContext(this.execContext); + + numOfClosedParentOperators = 0; + + initializeOp(hconf); + LOG.info("Initialization Done " + id + " " + getName()); + } + + public void initializeLocalWork(Configuration hconf) throws HiveException { + if (childOperators != null) { + for (int i =0; i childOp = this.childOperators.get(i); + childOp.initializeLocalWork(hconf); + } + } + } + + /** + * Operator specific initialization. + */ + protected void initializeOp(Configuration hconf) throws HiveException { + initializeChildren(hconf); + } + + /** + * Calls initialize on each of the children with outputObjetInspector as the + * output row format. + */ + protected void initializeChildren(Configuration hconf) throws HiveException { + state = State.INIT; + LOG.info("Operator " + id + " " + getName() + " initialized"); + if (childOperators == null) { + return; + } + LOG.info("Initializing children of " + id + " " + getName()); + for (int i = 0; i < childOperatorsArray.length; i++) { + childOperatorsArray[i].initialize(hconf, outputObjInspector, + childOperatorsTag[i]); + if (reporter != null) { + childOperatorsArray[i].setReporter(reporter); + } + } + } + + /** + * Pass the execContext reference to every child operator + */ + public void passExecContext(ExecMapperContext execContext) { + this.setExecContext(execContext); + if(childOperators != null) { + for (int i = 0; i < childOperators.size(); i++) { + childOperators.get(i).passExecContext(execContext); + } + } + } + + /** + * Collects all the parent's output object inspectors and calls actual + * initialization method. + * + * @param hconf + * @param inputOI + * OI of the row that this parent will pass to this op + * @param parentId + * parent operator id + * @throws HiveException + */ + private void initialize(Configuration hconf, ObjectInspector inputOI, + int parentId) throws HiveException { + LOG.info("Initializing child " + id + " " + getName()); + // Double the size of the array if needed + if (parentId >= inputObjInspectors.length) { + int newLength = inputObjInspectors.length * 2; + while (parentId >= newLength) { + newLength *= 2; + } + inputObjInspectors = Arrays.copyOf(inputObjInspectors, newLength); + } + inputObjInspectors[parentId] = inputOI; + // call the actual operator initialization function + initialize(hconf, null); + } + + public ObjectInspector[] getInputObjInspectors() { + return inputObjInspectors; + } + + public void setInputObjInspectors(ObjectInspector[] inputObjInspectors) { + this.inputObjInspectors = inputObjInspectors; + } + + /** + * Process the row. + * + * @param row + * The object representing the row. + * @param tag + * The tag of the row usually means which parent this row comes from. + * Rows with the same tag should have exactly the same rowInspector + * all the time. + */ + public abstract void processOp(Object row, int tag) throws HiveException; + + /** + * Process the row. + * + * @param row + * The object representing the row. + * @param tag + * The tag of the row usually means which parent this row comes from. + * Rows with the same tag should have exactly the same rowInspector + * all the time. + */ + public void process(Object row, int tag) throws HiveException { + if (fatalError) { + return; + } + preProcessCounter(); + processOp(row, tag); + postProcessCounter(); + } + + // If a operator wants to do some work at the beginning of a group + public void startGroup() throws HiveException { + LOG.debug("Starting group"); + + if (childOperators == null) { + return; + } + + if (fatalError) { + return; + } + + LOG.debug("Starting group for children:"); + for (OperatorOld op : childOperators) { + op.startGroup(); + } + + LOG.debug("Start group Done"); + } + + // If a operator wants to do some work at the end of a group + public void endGroup() throws HiveException { + LOG.debug("Ending group"); + + if (childOperators == null) { + return; + } + + if (fatalError) { + return; + } + + LOG.debug("Ending group for children:"); + for (OperatorOld op : childOperators) { + op.endGroup(); + } + + LOG.debug("End group Done"); + } + + protected boolean allInitializedParentsAreClosed() { + if (parentOperators != null) { + for (OperatorOld parent : parentOperators) { + if(parent==null){ + continue; + } + if (!(parent.state == State.CLOSE || parent.state == State.UNINIT)) { + return false; + } + } + } + return true; + } + + // the number of parent operators that has been closed + transient int numOfClosedParentOperators; + + public int getNumOfClosedParentOperators(){ + return numOfClosedParentOperators; + } + + // This close() function does not need to be synchronized + // since it is called by its parents' main thread, so no + // more than 1 thread should call this close() function. + public void close(boolean abort) throws HiveException { + + if (state == State.CLOSE) { + return; + } + + // check if all parents are finished + if (!allInitializedParentsAreClosed()) { + numOfClosedParentOperators++; + return; + } + + // set state as CLOSE as long as all parents are closed + // state == CLOSE doesn't mean all children are also in state CLOSE + state = State.CLOSE; + LOG.info(id + " finished. closing... "); + + if (counterNameToEnum != null) { + incrCounter(numInputRowsCntr, inputRows); + incrCounter(numOutputRowsCntr, outputRows); + incrCounter(timeTakenCntr, totalTime); + } + + LOG.info(id + " forwarded " + cntr + " rows"); + + // call the operator specific close routine + closeOp(abort); + numOfClosedParentOperators = 0; + + try { + logStats(); + if (childOperators == null) { + return; + } + + for (OperatorOld op : childOperators) { + op.close(abort); + } + + LOG.info(id + " Close done"); + } catch (HiveException e) { + e.printStackTrace(); + throw e; + } + } + + /** + * Operator specific close routine. Operators which inherents this class + * should overwrite this funtion for their specific cleanup routine. + */ + protected void closeOp(boolean abort) throws HiveException { + } + + /** + * Unlike other operator interfaces which are called from map or reduce task, + * jobClose is called from the jobclient side once the job has completed. + * + * @param conf + * Configuration with with which job was submitted + * @param success + * whether the job was completed successfully or not + */ + public void jobClose(Configuration conf, boolean success, JobCloseFeedBack feedBack) + throws HiveException { + if (childOperators == null) { + return; + } + + for (OperatorOld op : childOperators) { + op.jobClose(conf, success, feedBack); + } + } + + /** + * Cache childOperators in an array for faster access. childOperatorsArray is + * accessed per row, so it's important to make the access efficient. + */ + protected transient OperatorOld[] childOperatorsArray = null; + protected transient int[] childOperatorsTag; + + // counters for debugging + private transient long cntr = 0; + private transient long nextCntr = 1; + + /** + * Replace one child with another at the same position. The parent of the + * child is not changed + * + * @param child + * the old child + * @param newChild + * the new child + */ + public void replaceChild(OperatorOld child, + OperatorOld newChild) { + int childIndex = childOperators.indexOf(child); + assert childIndex != -1; + childOperators.set(childIndex, newChild); + } + + public void removeChild(OperatorOld child) { + int childIndex = childOperators.indexOf(child); + assert childIndex != -1; + if (childOperators.size() == 1) { + childOperators = null; + } else { + childOperators.remove(childIndex); + } + + int parentIndex = child.getParentOperators().indexOf(this); + assert parentIndex != -1; + if (child.getParentOperators().size() == 1) { + child.setParentOperators(null); + } else { + child.getParentOperators().remove(parentIndex); + } + } + + /** + * Remove a child and add all of the child's children to the location of the child + * + * @param child If this operator is not the only parent of the child. There can be unpredictable result. + * @throws SemanticException + */ + public void removeChildAndAdoptItsChildren(OperatorOld child) throws SemanticException { + int childIndex = childOperators.indexOf(child); + if (childIndex == -1) { + throw new SemanticException( + "Exception when trying to remove partition predicates: fail to find child from parent"); + } + + childOperators.remove(childIndex); + if (child.getChildOperators() != null && + child.getChildOperators().size() > 0) { + childOperators.addAll(childIndex, child.getChildOperators()); + } + + for (OperatorOld gc : child.getChildOperators()) { + List> parents = gc.getParentOperators(); + int index = parents.indexOf(child); + if (index == -1) { + throw new SemanticException( + "Exception when trying to remove partition predicates: fail to find parent from child"); + } + parents.set(index, this); + } + } + + public void removeParent(OperatorOld parent) { + int parentIndex = parentOperators.indexOf(parent); + assert parentIndex != -1; + if (parentOperators.size() == 1) { + parentOperators = null; + } + else { + parentOperators.remove(parentIndex); + } + + int childIndex = parent.getChildOperators().indexOf(this); + if (childIndex >= 0) { + if (parent.getChildOperators().size() == 1) { + parent.setChildOperators(null); + } + else { + parent.getChildOperators().remove(childIndex); + } + } + } + + /** + * Replace one parent with another at the same position. Chilren of the new + * parent are not updated + * + * @param parent + * the old parent + * @param newParent + * the new parent + */ + public void replaceParent(OperatorOld parent, + OperatorOld newParent) { + int parentIndex = parentOperators.indexOf(parent); + assert parentIndex != -1; + parentOperators.set(parentIndex, newParent); + } + + private long getNextCntr(long cntr) { + // A very simple counter to keep track of number of rows processed by an + // operator. It dumps + // every 1 million times, and quickly before that + if (cntr >= 1000000) { + return cntr + 1000000; + } + + return 10 * cntr; + } + + protected void forward(Object row, ObjectInspector rowInspector) + throws HiveException { + + if ((++outputRows % 1000) == 0) { + if (counterNameToEnum != null) { + incrCounter(numOutputRowsCntr, outputRows); + outputRows = 0; + } + } + + if (isLogInfoEnabled) { + cntr++; + if (cntr == nextCntr) { + LOG.info(id + " forwarding " + cntr + " rows"); + nextCntr = getNextCntr(cntr); + } + } + + // For debugging purposes: + // System.out.println("" + this.getClass() + ": " + + // SerDeUtils.getJSONString(row, rowInspector)); + // System.out.println("" + this.getClass() + ">> " + + // ObjectInspectorUtils.getObjectInspectorName(rowInspector)); + + if (childOperatorsArray == null && childOperators != null) { + throw new HiveException( + "Internal Hive error during operator initialization."); + } + + if ((childOperatorsArray == null) || (getDone())) { + return; + } + + int childrenDone = 0; + for (int i = 0; i < childOperatorsArray.length; i++) { + OperatorOld o = childOperatorsArray[i]; + if (o.getDone()) { + childrenDone++; + } else { + o.process(row, childOperatorsTag[i]); + } + } + + // if all children are done, this operator is also done + if (childrenDone == childOperatorsArray.length) { + setDone(true); + } + } + + public void resetStats() { + for (Enum e : statsMap.keySet()) { + statsMap.get(e).set(0L); + } + } + + public void reset(){ + this.state=State.INIT; + if (childOperators != null) { + for (OperatorOld o : childOperators) { + o.reset(); + } + } + + } + + /** + * OperatorFunc. + * + */ + public static interface OperatorFunc { + void func(OperatorOld op); + } + + public void preorderMap(OperatorFunc opFunc) { + opFunc.func(this); + if (childOperators != null) { + for (OperatorOld o : childOperators) { + o.preorderMap(opFunc); + } + } + } + + public void logStats() { + for (Enum e : statsMap.keySet()) { + LOG.info(e.toString() + ":" + statsMap.get(e).toString()); + } + } + + /** + * Implements the getName function for the Node Interface. + * + * @return the name of the operator + */ + public String getName() { + return "OP"; + } + + /** + * Returns a map of output column name to input expression map Note that + * currently it returns only key columns for ReduceSink and GroupBy operators. + * + * @return null if the operator doesn't change columns + */ + public Map getColumnExprMap() { + return colExprMap; + } + + public void setColumnExprMap(Map colExprMap) { + this.colExprMap = colExprMap; + } + + private String getLevelString(int level) { + if (level == 0) { + return "\n"; + } + StringBuilder s = new StringBuilder(); + s.append("\n"); + while (level > 0) { + s.append(" "); + level--; + } + return s.toString(); + } + + public String dump(int level) { + return dump(level, new HashSet()); + } + + public String dump(int level, HashSet seenOpts) { + if (seenOpts.contains(new Integer(id))) { + return null; + } + seenOpts.add(new Integer(id)); + + StringBuilder s = new StringBuilder(); + String ls = getLevelString(level); + s.append(ls); + s.append("<" + getName() + ">"); + s.append("Id =" + id); + + if (childOperators != null) { + s.append(ls); + s.append(" "); + for (OperatorOld o : childOperators) { + s.append(o.dump(level + 2, seenOpts)); + } + s.append(ls); + s.append(" <\\Children>"); + } + + if (parentOperators != null) { + s.append(ls); + s.append(" "); + for (OperatorOld o : parentOperators) { + s.append("Id = " + o.id + " "); + s.append(o.dump(level, seenOpts)); + } + s.append("<\\Parent>"); + } + + s.append(ls); + s.append("<\\" + getName() + ">"); + return s.toString(); + } + + /** + * Initialize an array of ExprNodeEvaluator and return the result + * ObjectInspectors. + */ + protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator[] evals, + ObjectInspector rowInspector) throws HiveException { + ObjectInspector[] result = new ObjectInspector[evals.length]; + for (int i = 0; i < evals.length; i++) { + result[i] = evals[i].initialize(rowInspector); + } + return result; + } + + /** + * Initialize an array of ExprNodeEvaluator from start, for specified length + * and return the result ObjectInspectors. + */ + protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator[] evals, + int start, int length, + ObjectInspector rowInspector) throws HiveException { + ObjectInspector[] result = new ObjectInspector[length]; + for (int i = 0; i < length; i++) { + result[i] = evals[start + i].initialize(rowInspector); + } + return result; + } + + /** + * Initialize an array of ExprNodeEvaluator and put the return values into a + * StructObjectInspector with integer field names. + */ + protected static StructObjectInspector initEvaluatorsAndReturnStruct( + ExprNodeEvaluator[] evals, List outputColName, + ObjectInspector rowInspector) throws HiveException { + ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, + rowInspector); + return ObjectInspectorFactory.getStandardStructObjectInspector( + outputColName, Arrays.asList(fieldObjectInspectors)); + } + + /** + * All counter stuff below this + */ + + /** + * TODO This is a hack for hadoop 0.17 which only supports enum counters. + */ + public static enum ProgressCounter { + CREATED_FILES, + C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, + C11, C12, C13, C14, C15, C16, C17, C18, C19, C20, + C21, C22, C23, C24, C25, C26, C27, C28, C29, C30, + C31, C32, C33, C34, C35, C36, C37, C38, C39, C40, + C41, C42, C43, C44, C45, C46, C47, C48, C49, C50, + C51, C52, C53, C54, C55, C56, C57, C58, C59, C60, + C61, C62, C63, C64, C65, C66, C67, C68, C69, C70, + C71, C72, C73, C74, C75, C76, C77, C78, C79, C80, + C81, C82, C83, C84, C85, C86, C87, C88, C89, C90, + C91, C92, C93, C94, C95, C96, C97, C98, C99, C100, + C101, C102, C103, C104, C105, C106, C107, C108, C109, C110, + C111, C112, C113, C114, C115, C116, C117, C118, C119, C120, + C121, C122, C123, C124, C125, C126, C127, C128, C129, C130, + C131, C132, C133, C134, C135, C136, C137, C138, C139, C140, + C141, C142, C143, C144, C145, C146, C147, C148, C149, C150, + C151, C152, C153, C154, C155, C156, C157, C158, C159, C160, + C161, C162, C163, C164, C165, C166, C167, C168, C169, C170, + C171, C172, C173, C174, C175, C176, C177, C178, C179, C180, + C181, C182, C183, C184, C185, C186, C187, C188, C189, C190, + C191, C192, C193, C194, C195, C196, C197, C198, C199, C200, + C201, C202, C203, C204, C205, C206, C207, C208, C209, C210, + C211, C212, C213, C214, C215, C216, C217, C218, C219, C220, + C221, C222, C223, C224, C225, C226, C227, C228, C229, C230, + C231, C232, C233, C234, C235, C236, C237, C238, C239, C240, + C241, C242, C243, C244, C245, C246, C247, C248, C249, C250, + C251, C252, C253, C254, C255, C256, C257, C258, C259, C260, + C261, C262, C263, C264, C265, C266, C267, C268, C269, C270, + C271, C272, C273, C274, C275, C276, C277, C278, C279, C280, + C281, C282, C283, C284, C285, C286, C287, C288, C289, C290, + C291, C292, C293, C294, C295, C296, C297, C298, C299, C300, + C301, C302, C303, C304, C305, C306, C307, C308, C309, C310, + C311, C312, C313, C314, C315, C316, C317, C318, C319, C320, + C321, C322, C323, C324, C325, C326, C327, C328, C329, C330, + C331, C332, C333, C334, C335, C336, C337, C338, C339, C340, + C341, C342, C343, C344, C345, C346, C347, C348, C349, C350, + C351, C352, C353, C354, C355, C356, C357, C358, C359, C360, + C361, C362, C363, C364, C365, C366, C367, C368, C369, C370, + C371, C372, C373, C374, C375, C376, C377, C378, C379, C380, + C381, C382, C383, C384, C385, C386, C387, C388, C389, C390, + C391, C392, C393, C394, C395, C396, C397, C398, C399, C400, + C401, C402, C403, C404, C405, C406, C407, C408, C409, C410, + C411, C412, C413, C414, C415, C416, C417, C418, C419, C420, + C421, C422, C423, C424, C425, C426, C427, C428, C429, C430, + C431, C432, C433, C434, C435, C436, C437, C438, C439, C440, + C441, C442, C443, C444, C445, C446, C447, C448, C449, C450, + C451, C452, C453, C454, C455, C456, C457, C458, C459, C460, + C461, C462, C463, C464, C465, C466, C467, C468, C469, C470, + C471, C472, C473, C474, C475, C476, C477, C478, C479, C480, + C481, C482, C483, C484, C485, C486, C487, C488, C489, C490, + C491, C492, C493, C494, C495, C496, C497, C498, C499, C500, + C501, C502, C503, C504, C505, C506, C507, C508, C509, C510, + C511, C512, C513, C514, C515, C516, C517, C518, C519, C520, + C521, C522, C523, C524, C525, C526, C527, C528, C529, C530, + C531, C532, C533, C534, C535, C536, C537, C538, C539, C540, + C541, C542, C543, C544, C545, C546, C547, C548, C549, C550, + C551, C552, C553, C554, C555, C556, C557, C558, C559, C560, + C561, C562, C563, C564, C565, C566, C567, C568, C569, C570, + C571, C572, C573, C574, C575, C576, C577, C578, C579, C580, + C581, C582, C583, C584, C585, C586, C587, C588, C589, C590, + C591, C592, C593, C594, C595, C596, C597, C598, C599, C600, + C601, C602, C603, C604, C605, C606, C607, C608, C609, C610, + C611, C612, C613, C614, C615, C616, C617, C618, C619, C620, + C621, C622, C623, C624, C625, C626, C627, C628, C629, C630, + C631, C632, C633, C634, C635, C636, C637, C638, C639, C640, + C641, C642, C643, C644, C645, C646, C647, C648, C649, C650, + C651, C652, C653, C654, C655, C656, C657, C658, C659, C660, + C661, C662, C663, C664, C665, C666, C667, C668, C669, C670, + C671, C672, C673, C674, C675, C676, C677, C678, C679, C680, + C681, C682, C683, C684, C685, C686, C687, C688, C689, C690, + C691, C692, C693, C694, C695, C696, C697, C698, C699, C700, + C701, C702, C703, C704, C705, C706, C707, C708, C709, C710, + C711, C712, C713, C714, C715, C716, C717, C718, C719, C720, + C721, C722, C723, C724, C725, C726, C727, C728, C729, C730, + C731, C732, C733, C734, C735, C736, C737, C738, C739, C740, + C741, C742, C743, C744, C745, C746, C747, C748, C749, C750, + C751, C752, C753, C754, C755, C756, C757, C758, C759, C760, + C761, C762, C763, C764, C765, C766, C767, C768, C769, C770, + C771, C772, C773, C774, C775, C776, C777, C778, C779, C780, + C781, C782, C783, C784, C785, C786, C787, C788, C789, C790, + C791, C792, C793, C794, C795, C796, C797, C798, C799, C800, + C801, C802, C803, C804, C805, C806, C807, C808, C809, C810, + C811, C812, C813, C814, C815, C816, C817, C818, C819, C820, + C821, C822, C823, C824, C825, C826, C827, C828, C829, C830, + C831, C832, C833, C834, C835, C836, C837, C838, C839, C840, + C841, C842, C843, C844, C845, C846, C847, C848, C849, C850, + C851, C852, C853, C854, C855, C856, C857, C858, C859, C860, + C861, C862, C863, C864, C865, C866, C867, C868, C869, C870, + C871, C872, C873, C874, C875, C876, C877, C878, C879, C880, + C881, C882, C883, C884, C885, C886, C887, C888, C889, C890, + C891, C892, C893, C894, C895, C896, C897, C898, C899, C900, + C901, C902, C903, C904, C905, C906, C907, C908, C909, C910, + C911, C912, C913, C914, C915, C916, C917, C918, C919, C920, + C921, C922, C923, C924, C925, C926, C927, C928, C929, C930, + C931, C932, C933, C934, C935, C936, C937, C938, C939, C940, + C941, C942, C943, C944, C945, C946, C947, C948, C949, C950, + C951, C952, C953, C954, C955, C956, C957, C958, C959, C960, + C961, C962, C963, C964, C965, C966, C967, C968, C969, C970, + C971, C972, C973, C974, C975, C976, C977, C978, C979, C980, + C981, C982, C983, C984, C985, C986, C987, C988, C989, C990, + C991, C992, C993, C994, C995, C996, C997, C998, C999, C1000 + }; + + private static int totalNumCntrs = 1000; + + /** + * populated at runtime from hadoop counters at run time in the client. + */ + protected transient HashMap counters; + + /** + * keeps track of unique ProgressCounter enums used this value is used at + * compile time while assigning ProgressCounter enums to counter names. + */ + private static int lastEnumUsed; + + protected transient long inputRows = 0; + protected transient long outputRows = 0; + protected transient long beginTime = 0; + protected transient long totalTime = 0; + + protected transient Object groupKeyObject; + + /** + * this is called before operator process to buffer some counters. + */ + private void preProcessCounter() { + inputRows++; + + if (counterNameToEnum != null) { + if ((inputRows % 1000) == 0) { + incrCounter(numInputRowsCntr, inputRows); + incrCounter(timeTakenCntr, totalTime); + inputRows = 0; + totalTime = 0; + } + beginTime = System.currentTimeMillis(); + } + } + + /** + * this is called after operator process to buffer some counters. + */ + private void postProcessCounter() { + if (counterNameToEnum != null) { + totalTime += (System.currentTimeMillis() - beginTime); + } + } + + /** + * this is called in operators in map or reduce tasks. + * + * @param name + * @param amount + */ + protected void incrCounter(String name, long amount) { + String counterName = "CNTR_NAME_" + getOperatorId() + "_" + name; + ProgressCounter pc = counterNameToEnum.get(counterName); + + // Currently, we maintain fixed number of counters per plan - in case of a + // bigger tree, we may run out of them + if (pc == null) { + LOG + .warn("Using too many counters. Increase the total number of counters for " + + counterName); + } else if (reporter != null) { + reporter.incrCounter(pc, amount); + } + } + + public ArrayList getCounterNames() { + return counterNames; + } + + public void setCounterNames(ArrayList counterNames) { + this.counterNames = counterNames; + } + + public String getOperatorId() { + return operatorId; + } + + public void initOperatorId() { + setOperatorId(getName() + "_" + this.id); + } + + public void setOperatorId(String operatorId) { + this.operatorId = operatorId; + } + + public HashMap getCounters() { + return counters; + } + + /** + * called in ExecDriver.progress periodically. + * + * @param ctrs + * counters from the running job + */ + @SuppressWarnings("unchecked") + public void updateCounters(Counters ctrs) { + if (counters == null) { + counters = new HashMap(); + } + + // For some old unit tests, the counters will not be populated. Eventually, + // the old tests should be removed + if (counterNameToEnum == null) { + return; + } + + for (Map.Entry counter : counterNameToEnum + .entrySet()) { + counters.put(counter.getKey(), ctrs.getCounter(counter.getValue())); + } + // update counters of child operators + // this wont be an infinite loop since the operator graph is acyclic + // but, some operators may be updated more than once and that's ok + if (getChildren() != null) { + for (Node op : getChildren()) { + ((OperatorOld) op).updateCounters(ctrs); + } + } + } + + /** + * Recursively check this operator and its descendants to see if the fatal + * error counter is set to non-zero. + * + * @param ctrs + */ + public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) { + if (counterNameToEnum == null) { + return false; + } + + String counterName = "CNTR_NAME_" + getOperatorId() + "_" + fatalErrorCntr; + ProgressCounter pc = counterNameToEnum.get(counterName); + + // Currently, we maintain fixed number of counters per plan - in case of a + // bigger tree, we may run out of them + if (pc == null) { + LOG + .warn("Using too many counters. Increase the total number of counters for " + + counterName); + } else { + long value = ctrs.getCounter(pc); + fatalErrorMessage(errMsg, value); + if (value != 0) { + return true; + } + } + + if (getChildren() != null) { + for (Node op : getChildren()) { + if (((OperatorOld) op).checkFatalErrors(ctrs, + errMsg)) { + return true; + } + } + } + return false; + } + + /** + * Get the fatal error message based on counter's code. + * + * @param errMsg + * error message should be appended to this output parameter. + * @param counterValue + * input counter code. + */ + protected void fatalErrorMessage(StringBuilder errMsg, long counterValue) { + } + + // A given query can have multiple map-reduce jobs + public static void resetLastEnumUsed() { + lastEnumUsed = 0; + } + + /** + * Called only in SemanticAnalyzer after all operators have added their own + * set of counter names. + */ + public void assignCounterNameToEnum() { + if (counterNameToEnum != null) { + return; + } + counterNameToEnum = new HashMap(); + for (String counterName : getCounterNames()) { + ++lastEnumUsed; + + // TODO Hack for hadoop-0.17 + // Currently, only maximum number of 'totalNumCntrs' can be used. If you + // want + // to add more counters, increase the number of counters in + // ProgressCounter + if (lastEnumUsed > totalNumCntrs) { + LOG + .warn("Using too many counters. Increase the total number of counters"); + return; + } + String enumName = "C" + lastEnumUsed; + ProgressCounter ctr = ProgressCounter.valueOf(enumName); + counterNameToEnum.put(counterName, ctr); + } + } + + protected static String numInputRowsCntr = "NUM_INPUT_ROWS"; + protected static String numOutputRowsCntr = "NUM_OUTPUT_ROWS"; + protected static String timeTakenCntr = "TIME_TAKEN"; + protected static String fatalErrorCntr = "FATAL_ERROR"; + + public void initializeCounters() { + initOperatorId(); + counterNames = new ArrayList(); + counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + numInputRowsCntr); + counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + numOutputRowsCntr); + counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + timeTakenCntr); + counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + fatalErrorCntr); + List newCntrs = getAdditionalCounters(); + if (newCntrs != null) { + counterNames.addAll(newCntrs); + } + } + + /* + * By default, the list is empty - if an operator wants to add more counters, + * it should override this method and provide the new list. + */ + private List getAdditionalCounters() { + return null; + } + + public HashMap getCounterNameToEnum() { + return counterNameToEnum; + } + + public void setCounterNameToEnum( + HashMap counterNameToEnum) { + this.counterNameToEnum = counterNameToEnum; + } + + /** + * Return the type of the specific operator among the + * types in OperatorType. + * + * @return OperatorType.* + */ + abstract public OperatorType getType(); + + public void setGroupKeyObject(Object keyObject) { + this.groupKeyObject = keyObject; + } + + public Object getGroupKeyObject() { + return groupKeyObject; + } + + /** + * Called during semantic analysis as operators are being added + * in order to give them a chance to compute any additional plan information + * needed. Does nothing by default. + */ + public void augmentPlan() { + } + + public ExecMapperContext getExecContext() { + return execContext; + } + + public void setExecContext(ExecMapperContext execContext) { + this.execContext = execContext; + if(this.childOperators != null) { + for (int i = 0; i op = this.childOperators.get(i); + op.setExecContext(execContext); + } + } + } + + // The input file has changed - every operator can invoke specific action + // for each input file + public void cleanUpInputFileChanged() throws HiveException { + this.cleanUpInputFileChangedOp(); + if(this.childOperators != null) { + for (int i = 0; i op = this.childOperators.get(i); + op.cleanUpInputFileChanged(); + } + } + } + + // If a operator needs to invoke specific cleanup, that operator can override + // this method + public void cleanUpInputFileChangedOp() throws HiveException { + } + + protected BytesWritable bytesWritableGroupKey; + + public void setBytesWritableGroupKey(BytesWritable groupKey) { + if(bytesWritableGroupKey == null){ + bytesWritableGroupKey = new BytesWritable(); + } + bytesWritableGroupKey.set(groupKey.get(), 0, groupKey.getSize()); + } + + public BytesWritable getBytesWritableGroupKey() { + return bytesWritableGroupKey; + } + +} Index: ql/src/test/results/clientpositive/show_functions.q.out =================================================================== --- ql/src/test/results/clientpositive/show_functions.q.out (revision 1205559) +++ ql/src/test/results/clientpositive/show_functions.q.out (working copy) @@ -189,6 +189,7 @@ corr cos count +count_distinct covar_pop covar_samp create_union Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java (revision 0) @@ -0,0 +1,727 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.CorrelationCompositeOperator; +import org.apache.hadoop.hive.ql.exec.CorrelationFakeReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.CorrelationOptimizer.IntraQueryCorrelation; +import org.apache.hadoop.hive.ql.parse.OpParseContext; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.RowResolver; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; +import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationFakeReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc; +import org.apache.hadoop.hive.ql.plan.ForwardDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; + + +public final class CorrelationOptimizerUtils { + + static final private Log LOG = LogFactory.getLog(CorrelationOptimizerUtils.class.getName()); + + public static boolean isExisted(ExprNodeDesc expr, ArrayList col_list) { + for (ExprNodeDesc thisExpr: col_list) { + if (expr.getExprString().equals(thisExpr.getExprString())) { + return true; + } + } + return false; + } + + public static String getColumnName(Map opColumnExprMap, ExprNodeDesc expr) { + for (Entry entry: opColumnExprMap.entrySet()) { + if (expr.getExprString().equals(entry.getValue().getExprString())) { + return entry.getKey(); + } + } + return null; + } + + + public static Operator unionUsedColumnsAndMakeNewSelect(ArrayList rsops, + IntraQueryCorrelation correlation, LinkedHashMap, + Map> originalOpColumnExprMap, TableScanOperator input, ParseContext pGraphContext, + LinkedHashMap, OpParseContext> originalOpParseCtx) { + + ArrayList columnNames = new ArrayList(); + Map colExprMap = new HashMap(); + ArrayList col_list = new ArrayList(); + RowResolver out_rwsch = new RowResolver(); + boolean isSelectAll = false; + + int pos = 0; + for (ReduceSinkOperator rsop: rsops) { + Operator curr = correlation.getBottom2TSops().get(rsop).get(0).getChildOperators().get(0); + while(true) { + if (curr.getName().equals("SEL")) { + SelectOperator selOp = (SelectOperator)curr; + if (selOp.getColumnExprMap() != null) { + for (Entry entry: selOp.getColumnExprMap().entrySet()) { + ExprNodeDesc expr = entry.getValue(); + if (!isExisted(expr, col_list) && originalOpParseCtx.get(selOp).getRowResolver().getInvRslvMap().containsKey(entry.getKey())) { + col_list.add(expr); + String[] colRef = originalOpParseCtx.get(selOp).getRowResolver().getInvRslvMap().get(entry.getKey()); + String tabAlias = colRef[0]; + String colAlias = colRef[1]; + String outputName = entry.getKey(); + out_rwsch.put(tabAlias, colAlias, new ColumnInfo( + outputName, expr.getTypeInfo(), tabAlias, false)); + pos++; + columnNames.add(outputName); + colExprMap.put(outputName, expr); + } + } + } else { + for (ExprNodeDesc expr: selOp.getConf().getColList()) { + if (!isExisted(expr, col_list)) { + col_list.add(expr); + String[] colRef = pGraphContext.getOpParseCtx().get(selOp).getRowResolver().getInvRslvMap().get(expr.getCols().get(0)); + String tabAlias = colRef[0]; + String colAlias = colRef[1]; + String outputName = expr.getCols().get(0); + out_rwsch.put(tabAlias, colAlias, new ColumnInfo( + outputName, expr.getTypeInfo(), tabAlias, false)); + columnNames.add(outputName); + colExprMap.put(outputName, expr); + pos++; + } + } + } + break; + } else if (curr.getName().equals("FIL")) { + isSelectAll = true; + break; + } else if (curr.getName().equals("RS")) { + ReduceSinkOperator thisRSop = (ReduceSinkOperator)curr; + for (ExprNodeDesc expr: thisRSop.getConf().getKeyCols()) { + if (!isExisted(expr, col_list)) { + col_list.add(expr); + assert expr.getCols().size() == 1; + String columnName = getColumnName(originalOpColumnExprMap.get(thisRSop), expr); + String[] colRef = pGraphContext.getOpParseCtx().get(thisRSop).getRowResolver().getInvRslvMap().get(columnName); + String tabAlias = colRef[0]; + String colAlias = colRef[1]; + String outputName = expr.getCols().get(0); + out_rwsch.put(tabAlias, colAlias, new ColumnInfo( + outputName, expr.getTypeInfo(), tabAlias, false)); + columnNames.add(outputName); + colExprMap.put(outputName, expr); + pos++; + } + } + for (ExprNodeDesc expr: thisRSop.getConf().getValueCols()) { + if (!isExisted(expr, col_list)) { + col_list.add(expr); + assert expr.getCols().size() == 1; + String columnName = getColumnName(originalOpColumnExprMap.get(thisRSop), expr); + String[] colRef = pGraphContext.getOpParseCtx().get(thisRSop).getRowResolver().getInvRslvMap().get(columnName); + String tabAlias = colRef[0]; + String colAlias = colRef[1]; + String outputName = expr.getCols().get(0); + out_rwsch.put(tabAlias, colAlias, new ColumnInfo( + outputName, expr.getTypeInfo(), tabAlias, false)); + columnNames.add(outputName); + colExprMap.put(outputName, expr); + pos++; + } + } + + break; + } else { + curr = curr.getChildOperators().get(0); + } + } + } + + Operator output; + if (isSelectAll) { + output = input; + } else { + output = putOpInsertMap(OperatorFactory.getAndMakeChild( + new SelectDesc(col_list, columnNames, false), new RowSchema( + out_rwsch.getColumnInfos()), input), out_rwsch, pGraphContext.getOpParseCtx()); + output.setColumnExprMap(colExprMap); + output.setChildOperators(Utilities.makeList()); + + } + + return output; + } + + + public static Operator putOpInsertMap(Operator op, + RowResolver rr, LinkedHashMap, OpParseContext> opParseCtx) { + OpParseContext ctx = new OpParseContext(rr); + opParseCtx.put(op, ctx); + op.augmentPlan(); + return op; + } + + public static HashMap, String> getAliasIDtTopOps(HashMap> topOps) { + HashMap, String> aliasIDtTopOps = new HashMap, String>(); + for (Entry> entry: topOps.entrySet()) { + assert !aliasIDtTopOps.containsKey(entry.getValue()); + aliasIDtTopOps.put(entry.getValue(), entry.getKey()); + } + return aliasIDtTopOps; + } + + public static ArrayList findPeerReduceSinkOperators(ReduceSinkOperator op) { + + ArrayList peerReduceSinkOperators = new ArrayList(); + + List> children = op.getChildOperators(); + assert children.size() == 1; + + for (Operator parent: children.get(0).getParentOperators()) { + assert (parent instanceof ReduceSinkOperator); + peerReduceSinkOperators.add((ReduceSinkOperator)parent); + } + + return peerReduceSinkOperators; + } + + public static ArrayList findPeerFakeReduceSinkOperators(CorrelationFakeReduceSinkOperator op) { + + ArrayList peerReduceSinkOperators = new ArrayList(); + + List> children = op.getChildOperators(); + assert children.size() == 1; + + for (Operator parent: children.get(0).getParentOperators()) { + assert (parent instanceof ReduceSinkOperator); + peerReduceSinkOperators.add((CorrelationFakeReduceSinkOperator)parent); + } + + return peerReduceSinkOperators; + } + + // find how many layer's of Fake reduce sink + public static int getPostComputationDepth(IntraQueryCorrelation correlation) { + int depth = 0; + for (ReduceSinkOperator rsop: correlation.getBottomReduceSinkOperators()) { + ReduceSinkOperator op = rsop; + int layer = 0; + while(!correlation.getTopReduceSinkOperators().contains(op)) { + assert correlation.getDown2upRSops().get(op).size() == 1; + op = correlation.getDown2upRSops().get(op).get(0); + layer++; + } + if (layer > depth) { + depth = layer; + } + } + assert depth >= 1; + return depth; + } + + + public static int getPostComputationDepthOfThisPlan(ReduceSinkOperator rsop, IntraQueryCorrelation correlation) { + int depth = 0; + ReduceSinkOperator op = rsop; + while(!correlation.getTopReduceSinkOperators().contains(op)) { + assert correlation.getDown2upRSops().get(op).size() == 1; + op = correlation.getDown2upRSops().get(op).get(0); + depth++; + } + assert depth >= 1; + return depth; + } + + public static ParseContext applyCorrelation(IntraQueryCorrelation correlation, ParseContext inputpGraphContext, + LinkedHashMap, Map> originalOpColumnExprMap, + LinkedHashMap, RowResolver> originalOpRowResolver, + Map groupbyRegular2MapSide, + LinkedHashMap, OpParseContext> originalOpParseCtx) { + + ParseContext pGraphContext = inputpGraphContext; + + // 0: if necessary, replace RS-GBY to GBY-RS-GBY. In GBY-RS-GBY, the first GBY is in type of hash, so it can group records + LOG.info("apply correlation step 0: replace RS-GBY to GBY-RS-GBY"); + for (ReduceSinkOperator rsop: correlation.getRSGBYToBeReplacedByGBYRSGBY()) { + LOG.info("operator " + rsop.getIdentifier() + " should be replaced"); + assert !correlation.getBottomReduceSinkOperators().contains(rsop); + GroupByOperator mapSideGBY = groupbyRegular2MapSide.get(rsop); + assert (mapSideGBY.getChildOperators().get(0).getChildOperators().get(0) instanceof GroupByOperator); + ReduceSinkOperator newRsop = (ReduceSinkOperator)mapSideGBY.getChildOperators().get(0); + GroupByOperator reduceSideGBY = (GroupByOperator)newRsop.getChildOperators().get(0); + GroupByOperator oldReduceSideGBY = (GroupByOperator)rsop.getChildOperators().get(0); + List> parents = rsop.getParentOperators(); + List> children = oldReduceSideGBY.getChildOperators(); + mapSideGBY.setParentOperators(parents); + for (Operator parent: parents) { + parent.replaceChild(rsop, mapSideGBY); + } + reduceSideGBY.setChildOperators(children); + for (Operator child: children) { + child.replaceParent(oldReduceSideGBY, reduceSideGBY); + } + correlation.getAllReduceSinkOperators().remove(rsop); + correlation.getAllReduceSinkOperators().add(newRsop); + } + + + Operator curr; + + // 1: Create table scan operator + LOG.info("apply correlation step 1: create table scan operator"); + HashMap oldTSOP2newTSOP = new HashMap(); + HashMap> oldTopOps = pGraphContext.getTopOps(); + HashMap, String> oldAliasIDtTopOps = getAliasIDtTopOps(oldTopOps); + HashMap oldTopToTable = pGraphContext.getTopToTable(); + HashMap> addedTopOps = new HashMap>(); + HashMap addedTopToTable = new HashMap(); + for (Entry> entry: correlation.getTable2CorrelatedTSops().entrySet()) { + TableScanOperator oldTSop = entry.getValue().get(0); + TableScanDesc tsDesc = new TableScanDesc(oldTSop.getConf().getAlias(), oldTSop.getConf().getVirtualCols()); + OpParseContext opParseCtx= pGraphContext.getOpParseCtx().get(oldTSop); + Operator top = putOpInsertMap(OperatorFactory.get(tsDesc, + new RowSchema(opParseCtx.getRowResolver().getColumnInfos())), + opParseCtx.getRowResolver(), pGraphContext.getOpParseCtx()); + top.setParentOperators(null); + top.setChildOperators(Utilities.makeList()); + for (TableScanOperator tsop: entry.getValue()) { + addedTopOps.put(oldAliasIDtTopOps.get(tsop), top); + addedTopToTable.put((TableScanOperator) top, oldTopToTable.get(tsop)); + oldTSOP2newTSOP.put(tsop, (TableScanOperator)top); + } + } + + int postComputationDepth = getPostComputationDepth(correlation); + ArrayList> childrenOfDispatch = new ArrayList>(); + for (ReduceSinkOperator rsop: correlation.getBottomReduceSinkOperators()) { + int thisPostComputationDepth = getPostComputationDepthOfThisPlan(rsop, correlation); + // TODO: currently, correlation optimizer can not handle the case that + // a table is directly connected to a operator. + assert correlation.getBottomReduceSinkOperators().containsAll(findPeerReduceSinkOperators(rsop)); + Operator op = rsop.getChildOperators().get(0); + if (!childrenOfDispatch.contains(op)) { + LOG.info("Add :" + op.getIdentifier() + " " + op.getName() + " to the children list of dispatch operator"); + childrenOfDispatch.add(op); + } + } + + int opTag = 0; + HashMap operationPath2CorrelationReduceSinkOps = new HashMap(); + for (Entry> entry: correlation.getTable2CorrelatedRSops().entrySet()) { + + // 2: Create select operator for shared op plans + LOG.info("apply correlation step 2: create select operator for shared operation path for the table of " + entry.getKey()); + curr = unionUsedColumnsAndMakeNewSelect(entry.getValue(), correlation, originalOpColumnExprMap, + oldTSOP2newTSOP.get(correlation.getBottom2TSops().get(entry.getValue().get(0)).get(0)), pGraphContext, + originalOpParseCtx); + + // 3: Create CorrelationCompositeOperator, CorrelationReduceSinkOperator + LOG.info("apply correlation step 3: create correlation composite Operator and correlation reduce sink operator for the table of " + entry.getKey()); + curr = createCorrelationCompositeReducesinkOperaotr( + correlation.getTable2CorrelatedTSops().get(entry.getKey()), entry.getValue(), correlation, curr, pGraphContext, + childrenOfDispatch, entry.getKey(), originalOpColumnExprMap, opTag, originalOpRowResolver); + + operationPath2CorrelationReduceSinkOps.put(new Integer(opTag), (ReduceSinkOperator)curr); + opTag++; + } + + + // 4: Create correlation dispatch operator for operation paths + LOG.info("apply correlation step 4: create correlation dispatch operator for operation paths"); + RowResolver outputRS = new RowResolver(); + List> correlationReduceSinkOps = new ArrayList>(); + for (Entry entry: operationPath2CorrelationReduceSinkOps.entrySet()) { + Integer opTagInteger = entry.getKey(); + curr = entry.getValue(); + correlationReduceSinkOps.add((ReduceSinkOperator)curr); + RowResolver inputRS = pGraphContext.getOpParseCtx().get(curr).getRowResolver(); + for (Entry> e1: inputRS.getRslvMap().entrySet()) { + for (Entry e2: e1.getValue().entrySet()) { + outputRS.put(e1.getKey(), e2.getKey(), e2.getValue()); + } + } + } + + Operator dispatchOp = putOpInsertMap(OperatorFactory.get( + new CorrelationReducerDispatchDesc(correlation.getDispatchConf(), correlation.getDispatchKeySelectDescConf(), correlation.getDispatchValueSelectDescConf()), + new RowSchema(outputRS.getColumnInfos())), + outputRS, pGraphContext.getOpParseCtx()); + + dispatchOp.setParentOperators(correlationReduceSinkOps); + for (Operator thisOp: correlationReduceSinkOps) { + thisOp.setChildOperators(Utilities.makeList(dispatchOp)); + } + + // 5: Replace the old plan in the original plan tree with new plan + LOG.info("apply correlation step 5: Replace the old plan in the original plan tree with the new plan"); + HashSet> processed = new HashSet>(); + for (Operator op: childrenOfDispatch) { + ArrayList> parents = new ArrayList>(); + for (Operator oldParent: op.getParentOperators()) { + if (!correlation.getBottomReduceSinkOperators().contains(oldParent)) { + parents.add(oldParent); + } + } + parents.add(dispatchOp); + op.setParentOperators(parents); + } + dispatchOp.setChildOperators(childrenOfDispatch); + HashMap> newTopOps = new HashMap>(); + for (Entry> entry: oldTopOps.entrySet()) { + if (addedTopOps.containsKey(entry.getKey())) { + newTopOps.put(entry.getKey(), addedTopOps.get(entry.getKey())); + } else { + newTopOps.put(entry.getKey(), entry.getValue()); + } + } + pGraphContext.setTopOps(newTopOps); + HashMap newTopToTable = new HashMap(); + for (Entry entry: oldTopToTable.entrySet()) { + if (addedTopToTable.containsKey(oldTSOP2newTSOP.get(entry.getKey()))) { + newTopToTable.put(oldTSOP2newTSOP.get(entry.getKey()), + addedTopToTable.get(oldTSOP2newTSOP.get(entry.getKey()))); + } else { + newTopToTable.put(entry.getKey(), entry.getValue()); + } + } + pGraphContext.setTopToTable(newTopToTable); + + // 6: Change each JFC related ReduceSinkOperator to a CorrelationFakeReduceSinkOperator + LOG.info("apply correlation step 6: Change each JFC related reduce sink operator to a correlation fake reduce sink operator"); + HashMap, ArrayList>> newParentsOfChildren = + new HashMap, ArrayList>>(); + for (ReduceSinkOperator rsop: correlation.getAllReduceSinkOperators()) { + if (!correlation.getBottomReduceSinkOperators().contains(rsop)) { + Operator childOP = rsop.getChildOperators().get(0); + Operator parentOP = rsop.getParentOperators().get(0); + Operator correlationFakeReduceSinkOperator = putOpInsertMap(OperatorFactory.get( + new CorrelationFakeReduceSinkDesc(rsop.getConf()), + new RowSchema(pGraphContext.getOpParseCtx().get(rsop).getRowResolver().getColumnInfos())), + pGraphContext.getOpParseCtx().get(rsop).getRowResolver(), pGraphContext.getOpParseCtx()); + correlationFakeReduceSinkOperator.setChildOperators(Utilities.makeList(childOP)); + correlationFakeReduceSinkOperator.setParentOperators(Utilities.makeList(parentOP)); + parentOP.getChildOperators().set(parentOP.getChildOperators().indexOf(rsop), correlationFakeReduceSinkOperator); + childOP.getParentOperators().set(childOP.getParentOperators().indexOf(rsop), correlationFakeReduceSinkOperator); + } + } + + return pGraphContext; + } + + public static Operator createCorrelationCompositeReducesinkOperaotr( + ArrayList tsops, ArrayList rsops, + IntraQueryCorrelation correlation, + Operator input, ParseContext pGraphContext, + ArrayList> childrenOfDispatch, String tableName, + LinkedHashMap, Map> originalOpColumnExprMap, int newTag, + LinkedHashMap, RowResolver> originalOpRowResolver) { + + // Create CorrelationCompositeOperator + RowResolver inputRR = pGraphContext.getOpParseCtx().get(input).getRowResolver(); + ArrayList> tops = new ArrayList>(); + ArrayList> bottoms = new ArrayList>(); + ArrayList opTags = new ArrayList(); + + for (ReduceSinkOperator rsop: rsops) { + TableScanOperator tsop = correlation.getBottom2TSops().get(rsop).get(0); + Operator curr = tsop.getChildOperators().get(0); + if (curr == rsop) { + // no filter needed, just forward + ForwardDesc forwardCtx = new ForwardDesc(); + Operator forwardOp = OperatorFactory.get(ForwardDesc.class); + forwardOp.setConf(forwardCtx); + tops.add(forwardOp); + bottoms.add(forwardOp); + opTags.add(correlation.getBottomReduceSink2OperationPathMap().get(rsop)); + } else { + // Add filter operator + FilterOperator currFilOp = null; + while(curr != rsop) { + if (curr.getName().equals("FIL")) { + FilterOperator fil = (FilterOperator)curr; + FilterDesc filterCtx = new FilterDesc(fil.getConf().getPredicate(), false); + Operator nowFilOp = OperatorFactory.get(FilterDesc.class); + nowFilOp.setConf(filterCtx); + if (currFilOp == null) { + currFilOp = (FilterOperator)nowFilOp; + tops.add(currFilOp); + } else { + nowFilOp.setParentOperators(Utilities.makeList(currFilOp)); + currFilOp.setChildOperators(Utilities.makeList(nowFilOp)); + currFilOp = (FilterOperator) nowFilOp; + } + } + curr = curr.getChildOperators().get(0); + } + if (currFilOp == null) { + ForwardDesc forwardCtx = new ForwardDesc(); + Operator forwardOp = OperatorFactory.get(ForwardDesc.class); + forwardOp.setConf(forwardCtx); + tops.add(forwardOp); + bottoms.add(forwardOp); + } else { + bottoms.add(currFilOp); + } + opTags.add(correlation.getBottomReduceSink2OperationPathMap().get(rsop)); + + } + } + + int[] opTagsArray = new int[opTags.size()]; + for (int i=0; i ycop = putOpInsertMap(OperatorFactory.getAndMakeChild(ycoCtx, + new RowSchema(inputRR.getColumnInfos()), input), + inputRR, pGraphContext.getOpParseCtx()); + + // Create CorrelationReduceSinkOperator + ArrayList partitionCols = new ArrayList(); + ArrayList keyCols = new ArrayList(); + Map colExprMap = new HashMap(); + ArrayList keyOutputColumnNames = new ArrayList(); + ReduceSinkOperator firstRsop = rsops.get(0); + + RowResolver firstRsopRS = pGraphContext.getOpParseCtx().get(firstRsop).getRowResolver(); + RowResolver orginalFirstRsopRS = originalOpRowResolver.get(firstRsop); + RowResolver outputRS = new RowResolver(); + HashMap keyCol2ExprForDispatch = new HashMap(); + HashMap valueCol2ExprForDispatch = new HashMap(); + + for (ExprNodeDesc expr: firstRsop.getConf().getKeyCols()) { + assert expr instanceof ExprNodeColumnDesc; + ExprNodeColumnDesc encd = (ExprNodeColumnDesc)expr; + String ouputName = getColumnName(originalOpColumnExprMap.get(firstRsop), expr); + ColumnInfo cinfo = orginalFirstRsopRS.getColumnInfos().get(orginalFirstRsopRS.getPosition(ouputName)); + + String col = SemanticAnalyzer.getColumnInternalName(keyCols.size()); + keyOutputColumnNames.add(col); + ColumnInfo newColInfo = new ColumnInfo(col, cinfo.getType(), tableName, cinfo + .getIsVirtualCol(), cinfo.isHiddenVirtualCol()); + + colExprMap.put(newColInfo.getInternalName(), expr); + + outputRS.put(tableName, newColInfo.getInternalName(), newColInfo); + keyCols.add(expr); + + keyCol2ExprForDispatch.put(encd.getColumn(), new ExprNodeColumnDesc(cinfo.getType(), col, tableName, + encd.getIsPartitionColOrVirtualCol())); + + } + + ArrayList valueCols = new ArrayList(); + ArrayList valueOutputColumnNames = new ArrayList(); + + correlation.addOperationPathToDispatchConf(newTag); + correlation.addOperationPathToDispatchKeySelectDescConf(newTag); + correlation.addOperationPathToDispatchValueSelectDescConf(newTag); + + + for (ReduceSinkOperator rsop: rsops) { + RowResolver rs = pGraphContext.getOpParseCtx().get(rsop).getRowResolver(); + RowResolver orginalRS = originalOpRowResolver.get(rsop); + Integer childOpIndex = childrenOfDispatch.indexOf(rsop.getChildOperators().get(0)); + int outputTag = rsop.getConf().getTag(); + if (outputTag == -1) { + outputTag = 0; + } + if (!correlation.getDispatchConfForOperationPath(newTag).containsKey(childOpIndex)) { + correlation.getDispatchConfForOperationPath(newTag).put(childOpIndex, new ArrayList()); + } + correlation.getDispatchConfForOperationPath(newTag).get(childOpIndex).add(outputTag); + + ArrayList thisKeyColsInDispatch = new ArrayList(); + ArrayList outputKeyNamesInDispatch = new ArrayList(); + for (ExprNodeDesc expr: rsop.getConf().getKeyCols()) { + assert expr instanceof ExprNodeColumnDesc; + ExprNodeColumnDesc encd = (ExprNodeColumnDesc)expr; + String outputName = getColumnName(originalOpColumnExprMap.get(rsop), expr); + thisKeyColsInDispatch.add(keyCol2ExprForDispatch.get(encd.getColumn())); + String[] names = outputName.split("\\."); + outputKeyNamesInDispatch.add(names[1]); + } + + if (!correlation.getDispatchKeySelectDescConfForOperationPath(newTag).containsKey(childOpIndex)) { + correlation.getDispatchKeySelectDescConfForOperationPath(newTag).put(childOpIndex, new ArrayList()); + } + correlation.getDispatchKeySelectDescConfForOperationPath(newTag).get(childOpIndex). + add(new SelectDesc(thisKeyColsInDispatch, outputKeyNamesInDispatch, false)); + + ArrayList thisValueColsInDispatch = new ArrayList(); + ArrayList outputValueNamesInDispatch = new ArrayList(); + for (ExprNodeDesc expr: rsop.getConf().getValueCols()) { + + String outputName = getColumnName(originalOpColumnExprMap.get(rsop), expr); + ColumnInfo cinfo = orginalRS.getColumnInfos().get(orginalRS.getPosition(outputName)); + if (!valueCol2ExprForDispatch.containsKey(expr.getExprString())) { + + String col = SemanticAnalyzer.getColumnInternalName(keyCols.size() + valueCols.size()); + valueOutputColumnNames.add(col); + ColumnInfo newColInfo = new ColumnInfo(col, cinfo.getType(), tableName, cinfo + .getIsVirtualCol(), cinfo.isHiddenVirtualCol()); + colExprMap.put(newColInfo.getInternalName(), expr); + outputRS.put(tableName, newColInfo.getInternalName(), newColInfo); + valueCols.add(expr); + + valueCol2ExprForDispatch.put(expr.getExprString(), new ExprNodeColumnDesc(cinfo.getType(), col, tableName, + false)); + } + + thisValueColsInDispatch.add(valueCol2ExprForDispatch.get(expr.getExprString())); + String[] names = outputName.split("\\."); + outputValueNamesInDispatch.add(names[1]); + } + + if (!correlation.getDispatchValueSelectDescConfForOperationPath(newTag).containsKey(childOpIndex)) { + correlation.getDispatchValueSelectDescConfForOperationPath(newTag).put(childOpIndex, new ArrayList()); + } + correlation.getDispatchValueSelectDescConfForOperationPath(newTag).get(childOpIndex). + add(new SelectDesc(thisValueColsInDispatch, outputValueNamesInDispatch, false)); + } + + ReduceSinkOperator rsOp = null; + try { + rsOp = (ReduceSinkOperator) putOpInsertMap( + OperatorFactory.getAndMakeChild(getReduceSinkDesc(keyCols, + keyCols.size(), valueCols, new ArrayList>(), + keyOutputColumnNames, valueOutputColumnNames, true, newTag, keyCols.size(), + -1), new RowSchema(outputRS + .getColumnInfos()), ycop), outputRS, pGraphContext.getOpParseCtx()); + rsOp.setColumnExprMap(colExprMap); + ((CorrelationCompositeOperator)ycop).getConf().setCorrespondingReduceSinkOperator(rsOp); + } catch (SemanticException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + return rsOp; + } + + + /** + * Create the correlation reduce sink descriptor. + * + * @param keyCols + * The columns to be stored in the key + * @param numKeys number of distribution keys. Equals to group-by-key + * numbers usually. + * @param valueCols + * The columns to be stored in the value + * @param distinctColIndices + * column indices for distinct aggregates + * @param outputKeyColumnNames + * The output key columns names + * @param outputValueColumnNames + * The output value columns names + * @param tag + * The tag for this reducesink + * @param numPartitionFields + * The first numPartitionFields of keyCols will be partition columns. + * If numPartitionFields=-1, then partition randomly. + * @param numReducers + * The number of reducers, set to -1 for automatic inference based on + * input data size. + * @return The YSmartReduceSinkDesc object. + */ + public static ReduceSinkDesc getReduceSinkDesc( + ArrayList keyCols, int numKeys, + ArrayList valueCols, + List> distinctColIndices, + List outputKeyColumnNames, List outputValueColumnNames, + boolean includeKey, int tag, + int numPartitionFields, int numReducers) throws SemanticException { + ArrayList partitionCols = null; + + if (numPartitionFields >= keyCols.size()) { + partitionCols = keyCols; + } else if (numPartitionFields >= 0) { + partitionCols = new ArrayList(numPartitionFields); + for (int i = 0; i < numPartitionFields; i++) { + partitionCols.add(keyCols.get(i)); + } + } else { + // numPartitionFields = -1 means random partitioning + partitionCols = new ArrayList(1); + partitionCols.add(TypeCheckProcFactory.DefaultExprProcessor + .getFuncExprNodeDesc("rand")); + } + + StringBuilder order = new StringBuilder(); + for (int i = 0; i < keyCols.size(); i++) { + order.append("+"); + } + + TableDesc keyTable = null; + TableDesc valueTable = null; + ArrayList outputKeyCols = new ArrayList(); + ArrayList outputValCols = new ArrayList(); + if (includeKey) { + keyTable = PlanUtils.getReduceKeyTableDesc(PlanUtils.getFieldSchemasFromColumnListWithLength( + keyCols, distinctColIndices, outputKeyColumnNames, numKeys, ""), + order.toString()); + outputKeyCols.addAll(outputKeyColumnNames); + } else { + keyTable = PlanUtils.getReduceKeyTableDesc(PlanUtils.getFieldSchemasFromColumnList( + keyCols, "reducesinkkey"),order.toString()); + for (int i = 0; i < keyCols.size(); i++) { + outputKeyCols.add("reducesinkkey" + i); + } + } + valueTable = PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromColumnList( + valueCols, outputValueColumnNames, 0, "")); + outputValCols.addAll(outputValueColumnNames); + + return new ReduceSinkDesc(keyCols, numKeys, valueCols, outputKeyCols, + distinctColIndices, outputValCols, + tag, partitionCols, numReducers, keyTable, + valueTable, true); + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationManualForwardOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationManualForwardOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationManualForwardOperator.java (revision 0) @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CorrelationManualForwardDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; + +/** + * Correlation Manual Forward Operator. Collect row and forward it by requests. + * CorrelationManualForwardOperator is a sub-operaotr within CorrelationCompositeOperator. + **/ +public class CorrelationManualForwardOperator extends Operator implements + Serializable { + private static final long serialVersionUID = 1L; + + private Object row; + private int tag; + + public Object getRow() { + return this.row; + } + + public int getTag() { + return this.tag; + } + + @Override + public void processOp(Object row, int tag) throws HiveException { + this.row = row; + this.tag = tag; + } + + public void forwardIt() throws HiveException { + forward(row, inputObjInspectors[tag]); + } + + public void clean() { + this.row = null; + this.tag = -1; + } + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return new String("CMF"); + } + + @Override + public OperatorType getType() { + // TODO Auto-generated method stub + return null; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java (revision 0) @@ -0,0 +1,418 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.BytesWritable; + +/** + * Correlation dispatch operator implementation. + * If used, CorrelationReducerDispatchOperator is the first operator in reduce phase. + * It will dispatch the record to corresponding JOIN or GBY operators. + * Suppose there are n children of this dispatch operator, a input record will be + * evaluated by n DispatchHandler that is used to select the corresponding parts of a record + * and then forward to succeeding JOIN or GBY operators. + */ +public class CorrelationReducerDispatchOperator extends Operator implements Serializable{ + + private static final long serialVersionUID = 1L; + private static String[] fieldNames; + static { + ArrayList fieldNameArray = new ArrayList(); + for (Utilities.ReduceField r : Utilities.ReduceField.values()) { + fieldNameArray.add(r.toString()); + } + fieldNames = fieldNameArray.toArray(new String[0]); + } + + protected static class DispatchHandler { + + protected Log l4j = LogFactory.getLog(this.getClass().getName()); + + private final ObjectInspector[] inputObjInspector; + private ObjectInspector outputObjInspector; + private ObjectInspector keyObjInspector; + private ObjectInspector valueObjInspector; + private final byte inputTag; + private final byte outputTag; + private final byte childIndx; + private final ByteWritable outputTagByteWritable; + private final SelectDesc selectDesc; + private final SelectDesc keySelectDesc; + private ExprNodeEvaluator[] keyEval; + private ExprNodeEvaluator[] eval; + + // counters for debugging + private transient long cntr = 0; + private transient long nextCntr = 1; + + private long getNextCntr(long cntr) { + // A very simple counter to keep track of number of rows processed by an + // operator. It dumps + // every 1 million times, and quickly before that + if (cntr >= 1000000) { + return cntr + 1000000; + } + + return 10 * cntr; + } + + public long getCntr() { + return this.cntr; + } + + private final Log LOG; + private final boolean isLogInfoEnabled; + private final String id; + + public DispatchHandler(ObjectInspector[] inputObjInspector, byte inputTag, byte childIndx, byte outputTag, + SelectDesc selectDesc, SelectDesc keySelectDesc, Log LOG, String id) + throws HiveException { + this.inputObjInspector = inputObjInspector; + assert this.inputObjInspector.length == 1; + this.inputTag = inputTag; + this.childIndx = childIndx; + this.outputTag = outputTag; + this.selectDesc = selectDesc; + this.keySelectDesc = keySelectDesc; + this.outputTagByteWritable = new ByteWritable(outputTag); + this.LOG = LOG; + this.isLogInfoEnabled = LOG.isInfoEnabled(); + this.id = id; + init(); + } + + private void init() throws HiveException { + ArrayList ois = new ArrayList(); + if (keySelectDesc.isSelStarNoCompute()) { + ois.add((ObjectInspector) ((ArrayList)inputObjInspector[0]).get(0)); + } else { + ArrayList colList = this.keySelectDesc.getColList(); + keyEval = new ExprNodeEvaluator[colList.size()]; + for (int k = 0; k < colList.size(); k++) { + assert (colList.get(k) != null); + keyEval[k] = ExprNodeEvaluatorFactory.get(colList.get(k)); + } + keyObjInspector = initEvaluatorsAndReturnStruct(keyEval, keySelectDesc + .getOutputColumnNames(), ((StandardStructObjectInspector) inputObjInspector[0]).getAllStructFieldRefs().get(0).getFieldObjectInspector()); + + ois.add(keyObjInspector); + l4j.info("Key: input tag " + (int)inputTag + ", output tag " + (int)outputTag + ", SELECT inputOIForThisTag" + + ((StructObjectInspector) inputObjInspector[0]).getTypeName()); + + } + if (selectDesc.isSelStarNoCompute()) { + ois.add((ObjectInspector) ((ArrayList)inputObjInspector[0]).get(1)); + } else { + ArrayList colList = this.selectDesc.getColList(); + eval = new ExprNodeEvaluator[colList.size()]; + for (int k = 0; k < colList.size(); k++) { + assert (colList.get(k) != null); + eval[k] = ExprNodeEvaluatorFactory.get(colList.get(k)); + } + valueObjInspector = initEvaluatorsAndReturnStruct(eval, selectDesc + .getOutputColumnNames(), ((StandardStructObjectInspector) inputObjInspector[0]).getAllStructFieldRefs().get(1).getFieldObjectInspector()); + + ois.add(valueObjInspector); + l4j.info("input tag " + (int)inputTag + ", output tag " + (int)outputTag + ", SELECT inputOIForThisTag" + + ((StructObjectInspector) inputObjInspector[0]).getTypeName()); //Yin + + } + ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector); + outputObjInspector = ObjectInspectorFactory + .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois); + l4j.info("input tag " + (int)inputTag + ", output tag " + (int)outputTag + ", SELECT outputObjInspector" + + ((StructObjectInspector) outputObjInspector).getTypeName()); //Yin + } + + public ObjectInspector getOutputObjInspector() { + return outputObjInspector; + } + + public Object process(Object row) throws HiveException { + Object[] keyOutput = new Object[keyEval.length]; + Object[] valueOutput = new Object[eval.length]; + ArrayList outputRow = new ArrayList(3); + List thisRow = (List)row; + if (keySelectDesc.isSelStarNoCompute()) { + outputRow.add(thisRow.get(0)); + } else { + Object key = thisRow.get(0); + for (int j = 0; j < keyEval.length; j++) { + try { + keyOutput[j] = keyEval[j].evaluate(key); + + } catch (HiveException e) { + throw e; + } catch (RuntimeException e) { + throw new HiveException("Error evaluating " + + keySelectDesc.getColList().get(j).getExprString(), e); + } + } + outputRow.add(keyOutput); + } + + if (selectDesc.isSelStarNoCompute()) { + outputRow.add(thisRow.get(1)); + } else { + Object value = thisRow.get(1); + for (int j = 0; j < eval.length; j++) { + try { + valueOutput[j] = eval[j].evaluate(value); + } catch (HiveException e) { + throw e; + } catch (RuntimeException e) { + throw new HiveException("Error evaluating " + + selectDesc.getColList().get(j).getExprString(), e); + } + } + outputRow.add(valueOutput); + } + outputRow.add(outputTagByteWritable); + + if (isLogInfoEnabled) { + cntr++; + if (cntr == nextCntr) { + LOG.info(id + "(inputTag, childIndx, outputTag)=(" + inputTag + ", " + childIndx + ", " + outputTag + "), forwarding " + cntr + " rows"); + nextCntr = getNextCntr(cntr); + } + } + + return outputRow; + } + + public void printCloseOpLog() { + LOG.info(id + "(inputTag, childIndx, outputTag)=(" + inputTag + ", " + childIndx + ", " + outputTag + "), forwarded " + cntr + " rows"); + } + + } + + //inputTag->(Child->List) + private HashMap>> dispatchConf; + //inputTag->(Child->List) + private HashMap>> dispatchValueSelectDescConf; + //inputTag->(Child->List) + private HashMap>> dispatchKeySelectDescConf; + //inputTag->(Child->List) + private HashMap>> dispatchHandlers; + //Child->(outputTag->DispatchHandler) + private HashMap> child2OutputTag2DispatchHandlers; + //Child->Child's inputObjInspectors + private HashMap childInputObjInspectors; + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + + dispatchConf = conf.getDispatchConf(); + dispatchValueSelectDescConf = conf.getDispatchValueSelectDescConf(); + dispatchKeySelectDescConf = conf.getDispatchKeySelectDescConf(); + dispatchHandlers = new HashMap>>(); + for (Entry>> entry: dispatchConf.entrySet()) { + HashMap> tmp = new HashMap>(); + for (Entry> child2outputTag: entry.getValue().entrySet()) { + tmp.put(child2outputTag.getKey(), new ArrayList()); + int indx = 0; + for (Integer outputTag: child2outputTag.getValue()) { + tmp.get(child2outputTag.getKey()).add( + new DispatchHandler(new ObjectInspector[]{inputObjInspectors[entry.getKey()]}, + entry.getKey().byteValue(), child2outputTag.getKey().byteValue(), outputTag.byteValue(), + dispatchValueSelectDescConf.get(entry.getKey()).get(child2outputTag.getKey()).get(indx), + dispatchKeySelectDescConf.get(entry.getKey()).get(child2outputTag.getKey()).get(indx), LOG, id)); + indx++; + } + } + dispatchHandlers.put(entry.getKey(), tmp); + } + + child2OutputTag2DispatchHandlers = new HashMap>(); + for (Entry>> entry: dispatchConf.entrySet()) { + for (Entry> child2outputTag: entry.getValue().entrySet()) { + if (!child2OutputTag2DispatchHandlers.containsKey(child2outputTag.getKey())) { + child2OutputTag2DispatchHandlers.put(child2outputTag.getKey(), new HashMap()); + } + int indx = 0; + for (Integer outputTag: child2outputTag.getValue()) { + child2OutputTag2DispatchHandlers.get(child2outputTag.getKey()). + put(outputTag, dispatchHandlers.get(entry.getKey()).get(child2outputTag.getKey()).get(indx)); + indx++; + } + + } + } + + childInputObjInspectors = new HashMap(); + for (Entry> entry: + child2OutputTag2DispatchHandlers.entrySet()) { + Integer l = Collections.max(entry.getValue().keySet()); + ObjectInspector[] childObjInspectors = new ObjectInspector[l.intValue() + 1]; + for (Entry e: entry.getValue().entrySet()) { + if (e.getKey().intValue() == -1) { + assert childObjInspectors.length == 1; + childObjInspectors[0] = e.getValue().getOutputObjInspector(); + } else { + childObjInspectors[e.getKey().intValue()] = e.getValue().getOutputObjInspector(); + } + } + childInputObjInspectors.put(entry.getKey(), childObjInspectors); + } + + initializeChildren(hconf); + } + + //Each child should has its own outputObjInspector + @Override + protected void initializeChildren(Configuration hconf) throws HiveException { + state = State.INIT; + LOG.info("Operator " + id + " " + getName() + " initialized"); + if (childOperators == null) { + return; + } + LOG.info("Initializing children of " + id + " " + getName()); + for (int i = 0; i < childOperatorsArray.length; i++) { + LOG.info("Initializing child " + i + " " + childOperatorsArray[i].getIdentifier() + " " + + childOperatorsArray[i].getName() + + " " + childInputObjInspectors.get(i).length); + childOperatorsArray[i].initialize(hconf, childInputObjInspectors.get(i)); + if (reporter != null) { + childOperatorsArray[i].setReporter(reporter); + } + } + } + + private int opTags; + private int inputTag; + + @Override + public void processOp(Object row, int tag) throws HiveException { + ArrayList thisRow = (ArrayList)row; + assert thisRow.size() == 4; + opTags = ((ByteWritable)thisRow.get(3)).get(); + inputTag = (int)((ByteWritable)thisRow.get(2)).get(); + forward(thisRow.subList(0, 3), inputObjInspectors[inputTag]); + } + + @Override + public void forward(Object row, ObjectInspector rowInspector) + throws HiveException { + if ((++outputRows % 1000) == 0) { + if (counterNameToEnum != null) { + incrCounter(numOutputRowsCntr, outputRows); + outputRows = 0; + } + } + + if (childOperatorsArray == null && childOperators != null) { + throw new HiveException( + "Internal Hive error during operator initialization."); + } + + if ((childOperatorsArray == null) || (getDone())) { + return; + } + + int childrenDone = 0; + int forwardFLag = 1; + assert childOperatorsArray.length <= 8; + for (int i = 0; i < childOperatorsArray.length; i++) { + Operator o = childOperatorsArray[i]; + if (o.getDone()) { + childrenDone++; + } else { + if ((opTags & (forwardFLag << i)) != 0){ + for(int j = 0; j> childIndx2DispatchHandlers: + dispatchHandlers.values()) { + for (ArrayList dispatchHandlers: childIndx2DispatchHandlers.values()) { + for (DispatchHandler dispatchHandler: dispatchHandlers) { + dispatchHandler.printCloseOpLog(); + } + } + } + } + + @Override + public void setBytesWritableGroupKey(BytesWritable groupKey) { + if (bytesWritableGroupKey == null) { + bytesWritableGroupKey = new BytesWritable(); + } + bytesWritableGroupKey.set(groupKey.get(), 0, groupKey.getSize()); + for (Operator op : childOperators) { + op.setBytesWritableGroupKey(bytesWritableGroupKey); + } + } + + @Override + public void setGroupKeyObject(Object keyObject) { + this.groupKeyObject = keyObject; + for (Operator op : childOperators) { + op.setGroupKeyObject(keyObject); + } + } + + @Override + public OperatorType getType() { + // TODO Auto-generated method stub + return null; + } + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return new String("CDP"); + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (revision 1205559) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (working copy) @@ -72,6 +72,7 @@ private Long minSplitSizePerRack; private boolean needsTagging; + private boolean needsOperationPathTagging; private boolean hadoopSupportsSplittable; private MapredLocalWork mapLocalWork; @@ -338,6 +339,16 @@ this.needsTagging = needsTagging; } + // TODO: include "Needs Operation Paths Tagging: false" into correct results + // @Explain(displayName = "Needs Operation Paths Tagging", normalExplain = false) + public boolean getNeedsOperationPathTagging() { + return needsOperationPathTagging; + } + + public void setNeedsOperationPathTagging(boolean needsOperationPathTagging) { + this.needsOperationPathTagging = needsOperationPathTagging; + } + public boolean getHadoopSupportsSplittable() { return hadoopSupportsSplittable; } Index: ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (revision 1205559) +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (working copy) @@ -279,6 +279,7 @@ private void populateMapRedPlan3(Table src, Table src2) throws SemanticException { mr.setNumReduceTasks(Integer.valueOf(5)); mr.setNeedsTagging(true); + mr.setNeedsOperationPathTagging(false); ArrayList outputColumns = new ArrayList(); for (int i = 0; i < 2; i++) { outputColumns.add("_col" + i); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java (revision 0) @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.io.LongWritable; + +/** + * Correlation composite operator implementation. + * This operator is used only in map phase. + * Suppose that multiple sub-queries involve a common table, + * to share the table scan, CorrelationCompositeOperator will be used. + * For example, suppose that the common table is T and predicates P1 and P2 will be used + * in sub-queries SQ1 and SQ2, respectively. The CorrelationCompositeOperator + * will apply P1 and P2 on the record and tag the record based on if P1 or P2 is true. + **/ +public class CorrelationCompositeOperator extends Operator implements Serializable { + + static final private Log LOG = LogFactory.getLog(Driver.class.getName()); + static final private LogHelper console = new LogHelper(LOG); + + public static enum Counter { + FILTERED, PASSED + } + + /** + * + */ + private static final long serialVersionUID = 1L; + + private ArrayList> bottomInternalOperators; + private ArrayList correlationManualForwardOperators; + + private ReduceSinkOperator correspondingReduceSinkOperators; + + private transient final LongWritable filtered_count, passed_count; + + + public CorrelationCompositeOperator() { + super(); + filtered_count = new LongWritable(); + passed_count = new LongWritable(); + } + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return new String("CCO"); + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + correspondingReduceSinkOperators = conf.getCorrespondingReduceSinkOperator(); + bottomInternalOperators = conf.getBottomInternalOperators(); + correlationManualForwardOperators = conf.getCorrelationManualForwardOperators(); + allOperationPathTags = conf.getAllOperationPathTags(); + statsMap.put(Counter.FILTERED, filtered_count); + statsMap.put(Counter.PASSED, passed_count); + // initialize internal operators + for (Operator op:bottomInternalOperators) { + op.initialize(hconf, inputObjInspectors); + } + //initialize its children + initializeChildren(hconf); + } + + private int[] allOperationPathTags; + + @Override + public void processOp(Object row, int tag) throws HiveException { + ArrayList operationPathTags = new ArrayList(); + boolean isForward = false; + for (Operator op: bottomInternalOperators){ + op.process(row, tag); + } + ArrayList rows = new ArrayList(); + int[] tags = new int[bottomInternalOperators.size()]; + int i = 0; + for (CorrelationManualForwardOperator ymf: correlationManualForwardOperators) { + rows.add(ymf.getRow()); + tags[i] = ymf.getTag(); + i++; + } + if (rows.size() > 0){ + i = 0; + for (Object r: rows){ + if (r != null){ + operationPathTags.add(allOperationPathTags[i]); + isForward = true; + } + i++; + } + assert correspondingReduceSinkOperators != null; + correspondingReduceSinkOperators.setOperationPathTags(operationPathTags); + } + if (isForward) { + passed_count.set(passed_count.get() + 1); + forward(row, inputObjInspectors[tag]); + } else { + filtered_count.set(filtered_count.get() + 1); + } + for (CorrelationManualForwardOperator ymf: correlationManualForwardOperators) { + ymf.clean(); + } + + } + + @Override + public OperatorType getType() { + // TODO Auto-generated method stub + return null; + } + +} Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1205559) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -401,6 +401,7 @@ HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true), + HIVEOPTCORRELATION("hive.optimize.correlation", false), // exploit intra-query correlations // Indexes HIVEOPTINDEXFILTER_COMPACT_MINSIZE("hive.optimize.index.filter.compact.minsize", (long) 5 * 1024 * 1024 * 1024), // 5G Index: ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (revision 1205559) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (working copy) @@ -34,8 +34,8 @@ import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; import org.apache.hadoop.hive.ql.plan.SMBJoinDesc; -import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; @@ -580,7 +580,7 @@ } @Override - protected boolean allInitializedParentsAreClosed() { + public boolean allInitializedParentsAreClosed() { return true; } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (revision 1205559) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (working copy) @@ -195,6 +195,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDTFParseUrlTuple; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTFStack; import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCountDistinct; import org.apache.hadoop.hive.ql.udf.xml.GenericUDFXPath; import org.apache.hadoop.hive.ql.udf.xml.UDFXPathBoolean; import org.apache.hadoop.hive.ql.udf.xml.UDFXPathDouble; @@ -387,6 +388,7 @@ registerGenericUDAF("sum", new GenericUDAFSum()); registerGenericUDAF("count", new GenericUDAFCount()); + registerGenericUDAF("count_distinct", new GenericUDAFCountDistinct()); registerGenericUDAF("avg", new GenericUDAFAverage()); registerGenericUDAF("std", new GenericUDAFStd()); Index: ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (revision 1205559) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (working copy) @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.hooks.LineageInfo; @@ -80,7 +81,8 @@ // reducer private Map> groupOpToInputTables; private Map prunedPartitions; - + + Map groupbyRegular2MapSide; /** * The lineage information. */ @@ -157,7 +159,8 @@ HashMap opToSamplePruner, SemanticAnalyzer.GlobalLimitCtx globalLimitCtx, HashMap nameToSplitSample, - HashSet semanticInputs, List> rootTasks) { + HashSet semanticInputs, List> rootTasks, + Map groupbyRegular2MapSide) { this.conf = conf; this.qb = qb; this.ast = ast; @@ -183,6 +186,7 @@ this.globalLimitCtx = globalLimitCtx; this.semanticInputs = semanticInputs; this.rootTasks = rootTasks; + this.groupbyRegular2MapSide = groupbyRegular2MapSide; } /** @@ -529,4 +533,8 @@ this.rootTasks.remove(rootTask); this.rootTasks.addAll(tasks); } + + public Map getGroupbyRegular2MapSide() { + return groupbyRegular2MapSide; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (revision 1205559) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (working copy) @@ -81,6 +81,24 @@ transient protected int numDistributionKeys; transient protected int numDistinctExprs; + + private final ArrayList operationPathTags = new ArrayList(); // operation path tags + private final byte[] operationPathTagsByte = new byte[1]; + + public void setOperationPathTags(ArrayList operationPathTags) { + this.operationPathTags.addAll(operationPathTags); + int operationPathTagsInt = 0; + int tmp = 1; + for (Integer operationPathTag: operationPathTags) { + operationPathTagsInt += tmp << operationPathTag.intValue(); + } + operationPathTagsByte[0] = (byte) operationPathTagsInt; + } + + public ArrayList getOperationPathTags() { + return this.operationPathTags; + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { @@ -267,9 +285,18 @@ keyWritable.set(key.getBytes(), 0, key.getLength()); } else { int keyLength = key.getLength(); - keyWritable.setSize(keyLength + 1); + if (!this.getConf().getNeedsOperationPathTagging()) { + keyWritable.setSize(keyLength + 1); + } else { + keyWritable.setSize(keyLength + 2); + } System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength); - keyWritable.get()[keyLength] = tagByte[0]; + if (!this.getConf().getNeedsOperationPathTagging()) { + keyWritable.get()[keyLength] = tagByte[0]; + } else { + keyWritable.get()[keyLength] = operationPathTagsByte[0]; + keyWritable.get()[keyLength + 1] = tagByte[0]; + } } } else { // Must be BytesWritable @@ -279,9 +306,18 @@ keyWritable.set(key.getBytes(), 0, key.getLength()); } else { int keyLength = key.getLength(); - keyWritable.setSize(keyLength + 1); - System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength); - keyWritable.get()[keyLength] = tagByte[0]; + if (!this.getConf().getNeedsOperationPathTagging()) { + keyWritable.setSize(keyLength + 1); + } else { + keyWritable.setSize(keyLength + 2); + } + System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength); + if (!this.getConf().getNeedsOperationPathTagging()) { + keyWritable.get()[keyLength] = tagByte[0]; + } else { + keyWritable.get()[keyLength] = operationPathTagsByte[0]; + keyWritable.get()[keyLength + 1] = tagByte[0]; + } } } keyWritable.setHashCode(keyHashCode); Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1205559) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.QueryProperties; import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExecDriver; @@ -58,7 +59,6 @@ import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapRedTask; -import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.RecordReader; @@ -184,7 +184,7 @@ private List loadTableWork; private List loadFileWork; private Map joinContext; - private final HashMap topToTable; + private HashMap topToTable; private QB qb; private ASTNode ast; private int destTableId; @@ -205,6 +205,9 @@ private final UnparseTranslator unparseTranslator; private final GlobalLimitCtx globalLimitCtx = new GlobalLimitCtx(); + // The mapping from (RS, GBY) parts to corresponding (MapSide-GBY, RS, GBY) + Map groupbyRegular2MapSide; + //prefix for column names auto generated by hive private final String autogenColAliasPrfxLbl; private final boolean autogenColAliasPrfxIncludeFuncName; @@ -285,6 +288,7 @@ autogenColAliasPrfxIncludeFuncName = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTOGEN_COLUMNALIAS_PREFIX_INCLUDEFUNCNAME); queryProperties = new QueryProperties(); + groupbyRegular2MapSide = new HashMap(); } @Override @@ -303,6 +307,8 @@ opParseCtx.clear(); groupOpToInputTables.clear(); prunedPartitions.clear(); + topToTable.clear(); + groupbyRegular2MapSide.clear(); } public void init(ParseContext pctx) { @@ -310,6 +316,7 @@ opToPartList = pctx.getOpToPartList(); opToSamplePruner = pctx.getOpToSamplePruner(); topOps = pctx.getTopOps(); + topToTable = pctx.getTopToTable(); topSelOps = pctx.getTopSelOps(); opParseCtx = pctx.getOpParseCtx(); loadTableWork = pctx.getLoadTableWork(); @@ -324,6 +331,7 @@ groupOpToInputTables = pctx.getGroupOpToInputTables(); prunedPartitions = pctx.getPrunedPartitions(); setLineageInfo(pctx.getLineageInfo()); + groupbyRegular2MapSide = pctx.getGroupbyRegular2MapSide(); } public ParseContext getParseContext() { @@ -331,7 +339,8 @@ topSelOps, opParseCtx, joinContext, topToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, - opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks); + opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, + groupbyRegular2MapSide); } @SuppressWarnings("nls") @@ -2908,6 +2917,7 @@ ColumnInfo colInfo = new ColumnInfo(field, expr.getTypeInfo(), null, false); reduceSinkOutputRowResolver.putExpression(parameter, colInfo); numExprs++; + colExprMap.put(colInfo.getInternalName(), expr); } distinctColIndices.add(distinctIndices); } @@ -2925,15 +2935,18 @@ for (int i = 1; i < value.getChildCount(); i++) { ASTNode parameter = (ASTNode) value.getChild(i); if (reduceSinkOutputRowResolver.getExpression(parameter) == null) { - reduceValues.add(genExprNodeDesc(parameter, - reduceSinkInputRowResolver)); + ExprNodeDesc expr = genExprNodeDesc(parameter, + reduceSinkInputRowResolver); + reduceValues.add(expr); outputValueColumnNames .add(getColumnInternalName(reduceValues.size() - 1)); String field = Utilities.ReduceField.VALUE.toString() + "." + getColumnInternalName(reduceValues.size() - 1); - reduceSinkOutputRowResolver.putExpression(parameter, new ColumnInfo(field, + ColumnInfo colInfo = new ColumnInfo(field, reduceValues.get(reduceValues.size() - 1).getTypeInfo(), null, - false)); + false); + reduceSinkOutputRowResolver.putExpression(parameter, colInfo); + colExprMap.put(colInfo.getInternalName(),expr); } } } @@ -2945,14 +2958,18 @@ TypeInfo type = reduceSinkInputRowResolver.getColumnInfos().get( inputField).getType(); - reduceValues.add(new ExprNodeColumnDesc(type, - getColumnInternalName(inputField), "", false)); + ExprNodeDesc expr = new ExprNodeColumnDesc(type, + getColumnInternalName(inputField), "", false); + reduceValues.add(expr); inputField++; outputValueColumnNames.add(getColumnInternalName(reduceValues.size() - 1)); String field = Utilities.ReduceField.VALUE.toString() + "." + getColumnInternalName(reduceValues.size() - 1); + ColumnInfo colInfo = new ColumnInfo(field, + type, null, false); reduceSinkOutputRowResolver.putExpression(entry.getValue(), - new ColumnInfo(field, type, null, false)); + colInfo); + colExprMap.put(colInfo.getInternalName(),expr); } } @@ -5893,7 +5910,8 @@ // insert a select operator here used by the ColumnPruner to reduce // the data to shuffle curr = insertSelectAllPlanForGroupBy(dest, curr); - if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) { + if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE) && + !conf.getBoolVar(HiveConf.ConfVars.HIVEOPTCORRELATION)) { if (!conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) { curr = genGroupByPlanMapAggr1MR(dest, qb, curr); } else { @@ -5902,7 +5920,19 @@ } else if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) { curr = genGroupByPlan2MR(dest, qb, curr); } else { + Operator mapSide = null; + if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE) && + conf.getBoolVar(HiveConf.ConfVars.HIVEOPTCORRELATION)) { + mapSide = genGroupByPlanMapAggr1MR(dest, qb, curr); + mapSide = (Operator)((Operator)mapSide.getParentOperators().get(0)).getParentOperators().get(0); + curr.getChildOperators().remove(mapSide); + } curr = genGroupByPlan1MR(dest, qb, curr); + if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE) && + conf.getBoolVar(HiveConf.ConfVars.HIVEOPTCORRELATION)) { + groupbyRegular2MapSide.put((ReduceSinkOperator )curr.getParentOperators().get(0), + (GroupByOperator)mapSide); + } } } @@ -7294,7 +7324,8 @@ opToPartList, topOps, topSelOps, opParseCtx, joinContext, topToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, - opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks); + opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, + groupbyRegular2MapSide); Optimizer optm = new Optimizer(); optm.setPctx(pCtx); Index: ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationFakeReduceSinkDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationFakeReduceSinkDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationFakeReduceSinkDesc.java (revision 0) @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.List; + +/** + * Correlation Fake ReduceSinkDesc. + * + */ +@Explain(displayName = "Correlation Fake Reduce Output Operator") +public class CorrelationFakeReduceSinkDesc implements Serializable { + private static final long serialVersionUID = 1L; + /** + * Key columns are passed to reducer in the "key". + */ + private java.util.ArrayList keyCols; + private java.util.ArrayList outputKeyColumnNames; + private List> distinctColumnIndices; + /** + * Value columns are passed to reducer in the "value". + */ + private java.util.ArrayList valueCols; + private java.util.ArrayList outputValueColumnNames; + /** + * Describe how to serialize the key. + */ + private TableDesc keySerializeInfo; + /** + * Describe how to serialize the value. + */ + private TableDesc valueSerializeInfo; + + /** + * The tag for this reducesink descriptor. + */ + private int tag; + + /** + * Number of distribution keys. + */ + private int numDistributionKeys; + + /** + * The partition columns (CLUSTER BY or DISTRIBUTE BY in Hive language). + * Partition columns decide the reducer that the current row goes to. + * Partition columns are not passed to reducer. + */ + private java.util.ArrayList partitionCols; + + private int numReducers; + + public CorrelationFakeReduceSinkDesc() { + } + + public CorrelationFakeReduceSinkDesc(java.util.ArrayList keyCols, + int numDistributionKeys, + java.util.ArrayList valueCols, + java.util.ArrayList outputKeyColumnNames, + List> distinctColumnIndices, + java.util.ArrayList outputValueColumnNames, int tag, + java.util.ArrayList partitionCols, int numReducers, + final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) { + this.keyCols = keyCols; + this.numDistributionKeys = numDistributionKeys; + this.valueCols = valueCols; + this.outputKeyColumnNames = outputKeyColumnNames; + this.outputValueColumnNames = outputValueColumnNames; + this.tag = tag; + this.numReducers = numReducers; + this.partitionCols = partitionCols; + this.keySerializeInfo = keySerializeInfo; + this.valueSerializeInfo = valueSerializeInfo; + this.distinctColumnIndices = distinctColumnIndices; + } + + public CorrelationFakeReduceSinkDesc(ReduceSinkDesc reduceSinkDesc){ + this.keyCols = reduceSinkDesc.getKeyCols(); + this.numDistributionKeys = reduceSinkDesc.getNumDistributionKeys(); + this.valueCols = reduceSinkDesc.getValueCols(); + this.outputKeyColumnNames = reduceSinkDesc.getOutputKeyColumnNames(); + this.outputValueColumnNames = reduceSinkDesc.getOutputValueColumnNames(); + this.tag = reduceSinkDesc.getTag(); + this.numReducers = reduceSinkDesc.getNumReducers(); + this.partitionCols = reduceSinkDesc.getPartitionCols(); + this.keySerializeInfo = reduceSinkDesc.getKeySerializeInfo(); + this.valueSerializeInfo = reduceSinkDesc.getValueSerializeInfo(); + this.distinctColumnIndices = reduceSinkDesc.getDistinctColumnIndices(); + } + + public java.util.ArrayList getOutputKeyColumnNames() { + return outputKeyColumnNames; + } + + public void setOutputKeyColumnNames( + java.util.ArrayList outputKeyColumnNames) { + this.outputKeyColumnNames = outputKeyColumnNames; + } + + public java.util.ArrayList getOutputValueColumnNames() { + return outputValueColumnNames; + } + + public void setOutputValueColumnNames( + java.util.ArrayList outputValueColumnNames) { + this.outputValueColumnNames = outputValueColumnNames; + } + + @Explain(displayName = "key expressions") + public java.util.ArrayList getKeyCols() { + return keyCols; + } + + public void setKeyCols(final java.util.ArrayList keyCols) { + this.keyCols = keyCols; + } + + public int getNumDistributionKeys() { + return this.numDistributionKeys; + } + + public void setNumDistributionKeys(int numKeys) { + this.numDistributionKeys = numKeys; + } + + @Explain(displayName = "value expressions") + public java.util.ArrayList getValueCols() { + return valueCols; + } + + public void setValueCols(final java.util.ArrayList valueCols) { + this.valueCols = valueCols; + } + + @Explain(displayName = "Map-reduce partition columns") + public java.util.ArrayList getPartitionCols() { + return partitionCols; + } + + public void setPartitionCols( + final java.util.ArrayList partitionCols) { + this.partitionCols = partitionCols; + } + + @Explain(displayName = "tag") + public int getTag() { + return tag; + } + + public void setTag(int tag) { + this.tag = tag; + } + + /** + * Returns the number of reducers for the map-reduce job. -1 means to decide + * the number of reducers at runtime. This enables Hive to estimate the number + * of reducers based on the map-reduce input data size, which is only + * available right before we start the map-reduce job. + */ + public int getNumReducers() { + return numReducers; + } + + public void setNumReducers(int numReducers) { + this.numReducers = numReducers; + } + + public TableDesc getKeySerializeInfo() { + return keySerializeInfo; + } + + public void setKeySerializeInfo(TableDesc keySerializeInfo) { + this.keySerializeInfo = keySerializeInfo; + } + + public TableDesc getValueSerializeInfo() { + return valueSerializeInfo; + } + + public void setValueSerializeInfo(TableDesc valueSerializeInfo) { + this.valueSerializeInfo = valueSerializeInfo; + } + + /** + * Returns the sort order of the key columns. + * + * @return null, which means ascending order for all key columns, or a String + * of the same length as key columns, that consists of only "+" + * (ascending order) and "-" (descending order). + */ + @Explain(displayName = "sort order") + public String getOrder() { + return keySerializeInfo.getProperties().getProperty( + org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER); + } + + public void setOrder(String orderStr) { + keySerializeInfo.getProperties().setProperty( + org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER, + orderStr); + } + + public List> getDistinctColumnIndices() { + return distinctColumnIndices; + } + + public void setDistinctColumnIndices( + List> distinctColumnIndices) { + this.distinctColumnIndices = distinctColumnIndices; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (revision 1205559) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (working copy) @@ -70,6 +70,15 @@ public ReduceSinkDesc() { } + private boolean needsOperationPathTagging; + public boolean getNeedsOperationPathTagging() { + return needsOperationPathTagging; + } + + public void setNeedsOperationPathTagging(boolean needsOperationPathTagging) { + this.needsOperationPathTagging = needsOperationPathTagging; + } + public ReduceSinkDesc(java.util.ArrayList keyCols, int numDistributionKeys, java.util.ArrayList valueCols, @@ -78,6 +87,20 @@ java.util.ArrayList outputValueColumnNames, int tag, java.util.ArrayList partitionCols, int numReducers, final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) { + this(keyCols, numDistributionKeys, valueCols, + outputKeyColumnNames, distinctColumnIndices, outputValueColumnNames, tag, + partitionCols, numReducers, keySerializeInfo, valueSerializeInfo, false); + } + + public ReduceSinkDesc(java.util.ArrayList keyCols, + int numDistributionKeys, + java.util.ArrayList valueCols, + java.util.ArrayList outputKeyColumnNames, + List> distinctColumnIndices, + java.util.ArrayList outputValueColumnNames, int tag, + java.util.ArrayList partitionCols, int numReducers, + final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo, + boolean needsOperationPathTagging) { this.keyCols = keyCols; this.numDistributionKeys = numDistributionKeys; this.valueCols = valueCols; @@ -89,6 +112,7 @@ this.keySerializeInfo = keySerializeInfo; this.valueSerializeInfo = valueSerializeInfo; this.distinctColumnIndices = distinctColumnIndices; + this.needsOperationPathTagging = needsOperationPathTagging; } public java.util.ArrayList getOutputKeyColumnNames() { @@ -186,7 +210,7 @@ /** * Returns the sort order of the key columns. - * + * * @return null, which means ascending order for all key columns, or a String * of the same length as key columns, that consists of only "+" * (ascending order) and "-" (descending order). @@ -196,7 +220,7 @@ return keySerializeInfo.getProperties().getProperty( org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER); } - + public void setOrder(String orderStr) { keySerializeInfo.getProperties().setProperty( org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER, Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (revision 1205559) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (working copy) @@ -60,16 +60,16 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; -import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; -import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; /** * General utility common functions for the Processor to convert operator into @@ -114,8 +114,14 @@ } if (reducer.getClass() == JoinOperator.class) { plan.setNeedsTagging(true); + plan.setNeedsOperationPathTagging(false); } + if (op.getConf().getNeedsOperationPathTagging()) { + plan.setNeedsTagging(true); + plan.setNeedsOperationPathTagging(true); + } + assert currTopOp != null; List> seenOps = opProcCtx.getSeenOps(); String currAliasId = opProcCtx.getCurrAliasId(); @@ -178,6 +184,7 @@ opTaskMap.put(reducer, currTask); if (reducer.getClass() == JoinOperator.class) { plan.setNeedsTagging(true); + plan.setNeedsOperationPathTagging(false); } ReduceSinkDesc desc = (ReduceSinkDesc) op.getConf(); plan.setNumReduceTasks(desc.getNumReducers()); @@ -308,6 +315,7 @@ if (reducer.getClass() == JoinOperator.class) { plan.setNeedsTagging(true); + plan.setNeedsOperationPathTagging(false); } initUnionPlan(opProcCtx, currTask, false); @@ -329,7 +337,7 @@ if ((taskTmpDirLst == null) || (taskTmpDirLst.isEmpty())) { return; } - + List tt_descLst = uCtx.getTTDesc(); assert !taskTmpDirLst.isEmpty() && !tt_descLst.isEmpty(); assert taskTmpDirLst.size() == tt_descLst.size(); @@ -337,7 +345,7 @@ assert local == false; List> topOperators = uCtx.getListTopOperators(); - + for (int pos = 0; pos < size; pos++) { String taskTmpDir = taskTmpDirLst.get(pos); TableDesc tt_desc = tt_descLst.get(pos); @@ -989,6 +997,7 @@ // dependent on the redTask if (reducer.getClass() == JoinOperator.class) { cplan.setNeedsTagging(true); + cplan.setNeedsOperationPathTagging(false); } } Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java (revision 0) @@ -0,0 +1,801 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.OpParseContext; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.QB; +import org.apache.hadoop.hive.ql.parse.QBExpr; +import org.apache.hadoop.hive.ql.parse.RowResolver; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +/** + * Implementation of rule-based correlation optimizer. The optimization is based on + * the paper "YSmart: Yet Another SQL-to-MapReduce Translator" + * (http://www.cse.ohio-state.edu/hpcs/WWW/HTML/publications/papers/TR-11-7.pdf). + * This optimizer detects three kinds of + * correlations, Input Correlation (IC), Transit Correlation (TC) and Job Flow Correlation (JFC). + * After the optimization, the structure of a plan tree should be TS->CCO->SEL(select the union of columns + * which used by operations)->CRS->CDP->operators in the reduce phase + */ + +public class CorrelationOptimizer implements Transform { + + static final private Log LOG = LogFactory.getLog(CorrelationOptimizer.class.getName()); + private final HashMap AliastoTabName; + private final HashMap AliastoTab; + + public CorrelationOptimizer() { + super(); + AliastoTabName = new HashMap(); + AliastoTab = new HashMap(); + pGraphContext = null; + } + + private void initializeAliastoTabNameMapping(QB qb) { + for (String alias: qb.getAliases()) { + AliastoTabName.put(alias, qb.getTabNameForAlias(alias)); + AliastoTab.put(alias, qb.getMetaData().getSrcForAlias(alias)); + } + for (String subqalias: qb.getSubqAliases()) { + QBExpr qbexpr = qb.getSubqForAlias(subqalias); + initializeAliastoTabNameMapping(qbexpr.getQB()); + } + } + + public static ExprNodeColumnDesc getStringColumn(String columnName) { + return new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, columnName, + "", false); + } + + protected ParseContext pGraphContext; + private LinkedHashMap, OpParseContext> opParseCtx; + private final LinkedHashMap, OpParseContext> originalOpParseCtx = + new LinkedHashMap, OpParseContext>(); + private final LinkedHashMap, RowResolver> originalOpRowResolver = + new LinkedHashMap, RowResolver>(); + private final LinkedHashMap, Map> originalOpColumnExprMap = + new LinkedHashMap, Map>(); + + private boolean isPhase1 = true; + + private Map groupbyRegular2MapSide; + + /** + * Transform the query tree. Firstly, find out correlations between operations. + * Then, group these operators in groups + * @param pactx + * current parse context + */ + public ParseContext transform(ParseContext pctx) throws SemanticException { + + if (isPhase1) { + + pGraphContext = pctx; + opParseCtx = pctx.getOpParseCtx(); + + CorrelationNodePhase1ProcCtx correlationCtx = new CorrelationNodePhase1ProcCtx(); + + Map opRules = new LinkedHashMap(); + + Dispatcher disp = new DefaultRuleDispatcher(getPhase1DefaultProc(), opRules, correlationCtx); + GraphWalker ogw = new DefaultGraphWalker(disp); + + // Create a list of topop nodes + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pGraphContext.getTopOps().values()); + ogw.startWalking(topNodes, null); + isPhase1 = false; + + } else { + + /* Types of correlations: + * 1) Input Correlation: Multiple nodes have input correlation + (IC) if their input relation sets are not disjoint; + 2) Transit Correlation: Multiple nodes have transit correlation + (TC) if they have not only input correlation, but + also the same partition key; + 3) Job Flow Correlation: A node has job flow correlation + (JFC) with one of its child nodes if it has the same + partition key as that child node. + * */ + + pGraphContext = pctx; + opParseCtx = pctx.getOpParseCtx(); + + groupbyRegular2MapSide = pctx.getGroupbyRegular2MapSide(); + + initializeAliastoTabNameMapping(pGraphContext.getQB()); + + // 1: find out correlation + CorrelationNodeProcCtx correlationCtx = new CorrelationNodeProcCtx(); + + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", "RS%"), new CorrelationNodeProc()); + + Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, correlationCtx); + GraphWalker ogw = new DefaultGraphWalker(disp); + + // Create a list of topop nodes + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pGraphContext.getTopOps().values()); + ogw.startWalking(topNodes, null); + + // 2: transform the query plan tree + LOG.info("Begain query plan transformation based on intra-query correlations"); + for (IntraQueryCorrelation correlation: correlationCtx.getCorrelations()) { + pGraphContext = CorrelationOptimizerUtils.applyCorrelation( + correlation, pGraphContext, originalOpColumnExprMap, originalOpRowResolver, + groupbyRegular2MapSide, originalOpParseCtx); + } + LOG.info("Finish query plan transformation based on intra-query correlations"); + + } + + return pGraphContext; + } + + + + private NodeProcessor getPhase1DefaultProc() { + return new NodeProcessor() { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { + Operator op = (Operator)nd; + OpParseContext opCtx= opParseCtx.get(op); + + if (op.getColumnExprMap() != null) { + originalOpColumnExprMap.put(op, op.getColumnExprMap()); + } + originalOpParseCtx.put(op, opCtx); + originalOpRowResolver.put(op, opCtx.getRowResolver()); + + return null; + } + }; + } + + private class CorrelationNodeProc implements NodeProcessor { + + public ReduceSinkOperator findNextChildReduceSinkOperator(ReduceSinkOperator rsop) { + Operator op = rsop.getChildOperators().get(0); + while(!op.getName().equals("RS")) { + if (op.getName().equals("FS")) { + return null; + } + assert op.getChildOperators().size() <= 1; + op = op.getChildOperators().get(0); + } + return (ReduceSinkOperator)op; + } + + /** + * Find all peer ReduceSinkOperators (which have the same child operator of op) of op (op included). + */ + private ArrayList findPeerReduceSinkOperators(ReduceSinkOperator op) { + + ArrayList peerReduceSinkOperators = new ArrayList(); + + List> children = op.getChildOperators(); + assert children.size() == 1; + + for (Operator parent: children.get(0).getParentOperators()) { + assert (parent instanceof ReduceSinkOperator); + peerReduceSinkOperators.add((ReduceSinkOperator)parent); + } + + return peerReduceSinkOperators; + } + + private ArrayList findCorrelatedReduceSinkOperators(Operator op, + HashSet keyColumns, IntraQueryCorrelation correlation) throws Exception{ + + LOG.info("now detecting operator " + op.getIdentifier() + " " + op.getName()); + + ArrayList correlatedReduceSinkOps = new ArrayList(); + if (op.getParentOperators() == null) { + return correlatedReduceSinkOps; + } + if (originalOpColumnExprMap.get(op) == null && !(op instanceof ReduceSinkOperator)) { + assert op.getParentOperators().size() == 1; + correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators( + (Operator)op.getParentOperators().get(0), keyColumns, correlation)); + } else if (originalOpColumnExprMap.get(op) != null && !(op instanceof ReduceSinkOperator)) { + HashSet newKeyColumns = new HashSet(); + for (String keyColumn: keyColumns) { + ExprNodeDesc col = (ExprNodeDesc) originalOpColumnExprMap.get(op).get(keyColumn); + if (col instanceof ExprNodeColumnDesc) { + newKeyColumns.add(((ExprNodeColumnDesc)col).getColumn()); + } + } + + if (op.getName().equals("JOIN")) { + HashSet tableNeedToCheck = new HashSet(); + for (String keyColumn: keyColumns) { + for (ColumnInfo cinfo: originalOpParseCtx.get(op).getRowResolver().getColumnInfos()) { + if (keyColumn.equals(cinfo.getInternalName())) { + tableNeedToCheck.add(cinfo.getTabAlias()); + } + } + } + + for (Object parent: op.getParentOperators()) { + assert originalOpParseCtx.get(parent).getRowResolver().getTableNames().size() == 1; + for (String tbl: originalOpParseCtx.get(parent).getRowResolver().getTableNames()) { + if (tableNeedToCheck.contains(tbl)) { + correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators((Operator)parent, newKeyColumns, correlation)); + break; + } + } + } + + } else { + assert op.getParentOperators().size() == 1; + correlatedReduceSinkOps.addAll(findCorrelatedReduceSinkOperators((Operator)op.getParentOperators().get(0), newKeyColumns, correlation)); + } + + } else if (originalOpColumnExprMap.get(op) != null && op instanceof ReduceSinkOperator) { + + HashSet newKeyColumns = new HashSet(); + for (String keyColumn: keyColumns) { + ExprNodeDesc col = (ExprNodeDesc) originalOpColumnExprMap.get(op).get(keyColumn); + if (col instanceof ExprNodeColumnDesc) { + newKeyColumns.add(((ExprNodeColumnDesc)col).getColumn()); + } + } + + ReduceSinkOperator rsop = (ReduceSinkOperator)op; + HashSet thisKeyColumns = new HashSet(); + for (ExprNodeDesc key: rsop.getConf().getKeyCols()) { + if (key instanceof ExprNodeColumnDesc) { + thisKeyColumns.add(((ExprNodeColumnDesc)key).getColumn()); + } + } + + // if the intersection of newKeyColumns and thisKeyColumns is not empty, isCorrelated is true + boolean isCorrelated = false; + // TODO: should use if intersection is empty to evaluate if two corresponding operators are correlated + Set intersection = new HashSet(newKeyColumns); + intersection.retainAll(thisKeyColumns); + isCorrelated = !(intersection.isEmpty()); + + ReduceSinkOperator nextChildReduceSinkOperator = findNextChildReduceSinkOperator(rsop); + + if (isCorrelated) { + if (nextChildReduceSinkOperator.getChildOperators().get(0).getName().equals("JOIN")) { + if (intersection.size() != nextChildReduceSinkOperator.getConf().getKeyCols().size() || + intersection.size() != rsop.getConf().getKeyCols().size()) { + isCorrelated = false; + } + } + } + + if (isCorrelated) { + if (((Operator)(op.getChildOperators().get(0))).getName().equals("JOIN")) { + ArrayList peers = findPeerReduceSinkOperators(rsop); + correlatedReduceSinkOps.addAll(peers); + } else { + correlatedReduceSinkOps.add(rsop); + } + if (nextChildReduceSinkOperator.getChildOperators().get(0).getName().equals("GBY") && + (intersection.size() < nextChildReduceSinkOperator.getConf().getKeyCols().size())) { + correlation.addToRSGBYToBeReplacedByGBYRSGBY(nextChildReduceSinkOperator); + } + + } else { + correlatedReduceSinkOps.clear(); + correlation.getRSGBYToBeReplacedByGBYRSGBY().clear(); + } + } else { + throw new Exception("Correlation optimizer: ReduceSinkOperator " + op.getIdentifier() + " does not have ColumnExprMap"); + } + return correlatedReduceSinkOps; + } + + private ArrayList exploitJFC(ReduceSinkOperator op, CorrelationNodeProcCtx correlationCtx, IntraQueryCorrelation correlation) { + + correlationCtx.addWalked(op); + correlation.addToAllReduceSinkOperators(op); + + ArrayList ReduceSinkOperators = new ArrayList(); + + boolean sholdDetect = true; + + ArrayList keys = op.getConf().getKeyCols(); + HashSet keyColumns = new HashSet(); + for (ExprNodeDesc key: keys) { + if (!(key instanceof ExprNodeColumnDesc)) { + sholdDetect = false; + } else { + keyColumns.add(((ExprNodeColumnDesc)key).getColumn()); + } + } + + if (sholdDetect) { + ArrayList newReduceSinkOperators = new ArrayList(); + for (Operator parent: op.getParentOperators()) { + try { + ArrayList correlatedReduceSinkOperators = + findCorrelatedReduceSinkOperators(parent, keyColumns, correlation); + if (correlatedReduceSinkOperators == null || correlatedReduceSinkOperators.size() == 0) { + newReduceSinkOperators.add(op); + } else { + ArrayList deduplicatedCorrelatedReduceSinkOperators = new ArrayList(); + for (ReduceSinkOperator rsop: correlatedReduceSinkOperators) { + if (!deduplicatedCorrelatedReduceSinkOperators.contains(rsop)) { + deduplicatedCorrelatedReduceSinkOperators.add(rsop); + } + } + for (ReduceSinkOperator rsop: deduplicatedCorrelatedReduceSinkOperators) { + if ( !correlation.getUp2downRSops().containsKey(op) ) { + correlation.getUp2downRSops().put(op, new ArrayList()); + } + correlation.getUp2downRSops().get(op).add(rsop); + + if ( !correlation.getDown2upRSops().containsKey(rsop)) { + correlation.getDown2upRSops().put(rsop, new ArrayList()); + } + correlation.getDown2upRSops().get(rsop).add(op); + ArrayList exploited = exploitJFC(rsop, correlationCtx, correlation); + if (exploited == null || exploited.size() == 0) { + newReduceSinkOperators.add(rsop); + } else { + newReduceSinkOperators.addAll(exploited); + } + } + } + + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + ReduceSinkOperators.clear(); + ReduceSinkOperators.addAll(newReduceSinkOperators); + } + return ReduceSinkOperators; + } + + private TableScanOperator findTableScanOPerator(Operator startPoint) { + Operator thisOp = (Operator) startPoint.getParentOperators().get(0); + while(true) { + if (thisOp.getName().equals("RS")) { + return null; + } else if (thisOp.getName().equals("TS")) { + return (TableScanOperator)thisOp; + } + else { + if (thisOp.getParentOperators() != null) { + thisOp = (Operator) thisOp.getParentOperators().get(0); + } + else { + break; + } + } + } + return null; + } + + private void annotateOpPlan(IntraQueryCorrelation correlation) { + HashMap bottomReduceSink2OpPlanMap = new HashMap(); + int count = 0; + + for (ReduceSinkOperator rsop: correlation.getBottomReduceSinkOperators()) { + if (!bottomReduceSink2OpPlanMap.containsKey(rsop)) { + bottomReduceSink2OpPlanMap.put(rsop, count); + for (ReduceSinkOperator peerRSop: findPeerReduceSinkOperators(rsop)) { + if (correlation.getBottomReduceSinkOperators().contains(peerRSop)) { + bottomReduceSink2OpPlanMap.put(peerRSop, count); + } + } + count++; + } + } + correlation.setBottomReduceSink2OperationPathMap(bottomReduceSink2OpPlanMap); + } + + public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, + Object... nodeOutputs) throws SemanticException { + + CorrelationNodeProcCtx correlationCtx = (CorrelationNodeProcCtx)ctx; + + ReduceSinkOperator op = (ReduceSinkOperator)nd; + + if (correlationCtx.isWalked(op)) { + return null; + } + + if (op.getConf().getKeyCols().size() == 0 || + (!op.getChildOperators().get(0).getName().equals("JOIN") && + !op.getChildOperators().get(0).getName().equals("GBY"))) { + correlationCtx.addWalked(op); + return null; + } + + // 1: find out correlation + IntraQueryCorrelation correlation = new IntraQueryCorrelation(); + ArrayList peerReduceSinkOperators = findPeerReduceSinkOperators(op); + ArrayList bottomReduceSinkOperators = new ArrayList(); + for (ReduceSinkOperator rsop: peerReduceSinkOperators) { + + ArrayList thisBottomReduceSinkOperators= exploitJFC(rsop, correlationCtx, correlation); + + if (peerReduceSinkOperators.size() == 1) { + correlation.addToRSGBYToBeReplacedByGBYRSGBY(rsop); + } + if (thisBottomReduceSinkOperators.size() == 0) { + thisBottomReduceSinkOperators.add(rsop); + } else { + boolean isClear = false; + for (ReduceSinkOperator bottomrsop: thisBottomReduceSinkOperators) { + TableScanOperator tsop = findTableScanOPerator(bottomrsop); + if (tsop == null) { + isClear = true; // currently the optimizer can only optimize correlations involving source tables + } else { + if ( !correlation.getTop2TSops().containsKey(rsop) ) { + correlation.getTop2TSops().put(rsop, new ArrayList()); + } + correlation.getTop2TSops().get(rsop).add(tsop); + + if ( !correlation.getBottom2TSops().containsKey(bottomrsop)) { + correlation.getBottom2TSops().put(bottomrsop, new ArrayList()); + } + correlation.getBottom2TSops().get(bottomrsop).add(tsop); + } + } + if (isClear) { + thisBottomReduceSinkOperators.clear(); + thisBottomReduceSinkOperators.add(rsop); + } + } + bottomReduceSinkOperators.addAll(thisBottomReduceSinkOperators); + } + + if (!peerReduceSinkOperators.containsAll(bottomReduceSinkOperators)) { + LOG.info("has job flow correlation"); + correlation.setJobFlowCorrelation(true); + correlation.setJFCCorrelation(peerReduceSinkOperators, bottomReduceSinkOperators); + annotateOpPlan(correlation); + } + + if (correlation.hasJobFlowCorrelation()) { + boolean hasICandTC = findICandTC(correlation); + LOG.info("has input correlation and transit correlation? " + hasICandTC); + correlation.setInputCorrelation(hasICandTC); + correlation.setTransitCorrelation(hasICandTC); + boolean isInvolve = isInvolveSelfJoin(correlation); + LOG.info("involve self-join? " + isInvolve); + correlation.setInvolveSelfJoin(isInvolve); + //TODO: support self-join involved cases. For self-join related operation paths, after the correlation dispatch operator, each path should be filtered by a + // filter operator + if (!isInvolve) { + LOG.info("correlation detected"); + correlationCtx.addCorrelation(correlation); + } else { + LOG.info("correlation discarded. The current optimizer cannot optimize it"); + } + } + + correlationCtx.addWalked(op); + + return null; + } + + private boolean isInvolveSelfJoin(IntraQueryCorrelation correlation) { + boolean isInvolve = false; + for (Entry> entry: correlation.getTable2CorrelatedRSops().entrySet()) { + for (ReduceSinkOperator rsop: entry.getValue()) { + HashSet intersection = new HashSet(findPeerReduceSinkOperators(rsop)); + intersection.retainAll(entry.getValue()); + + // if involve self-join + if (intersection.size() > 1) { + isInvolve = true; + return isInvolve; + } + } + } + return isInvolve; + } + + private boolean findICandTC(IntraQueryCorrelation correlation) { + + boolean hasICandTC = false; + HashMap> table2RSops = new HashMap>(); + HashMap> table2TSops = new HashMap>(); + + for (Entry> entry: correlation.getBottom2TSops().entrySet()) { + String tbl = AliastoTabName.get(entry.getValue().get(0).getConf().getAlias()); + if (!table2RSops.containsKey(tbl) && !table2TSops.containsKey(tbl)) { + table2RSops.put(tbl, new ArrayList()); + table2TSops.put(tbl, new ArrayList()); + } + assert entry.getValue().size() == 1; + table2RSops.get(tbl).add(entry.getKey()); + table2TSops.get(tbl).add(entry.getValue().get(0)); + } + + for (Entry> entry: table2RSops.entrySet()) { + if (entry.getValue().size() > 1) { + hasICandTC = true; + } + } + + + correlation.setICandTCCorrelation(table2RSops, table2TSops); + + return hasICandTC; + } + } + + private NodeProcessor getDefaultProc() { + return new NodeProcessor() { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { + // do nothing + return null; + } + }; + } + + private class CorrelationNodePhase1ProcCtx implements NodeProcessorCtx { + + } + + + public class IntraQueryCorrelation{ + + private final HashMap> down2upRSops = new HashMap>(); + private final HashMap> up2downRSops = new HashMap>(); + + private final HashMap> top2TSops = new HashMap>(); + private final HashMap> bottom2TSops = new HashMap>(); + + private ArrayList topReduceSinkOperators; + private ArrayList bottomReduceSinkOperators; + + private HashMap> table2CorrelatedRSops; + + private HashMap> table2CorrelatedTSops; + + private HashMap bottomReduceSink2OperationPathMap; + + private final HashMap>> dispatchConf = + new HashMap>>(); //inputTag->(Child->outputTag) + private final HashMap>> dispatchValueSelectDescConf = + new HashMap>>(); //inputTag->(Child->SelectDesc) + private final HashMap>> dispatchKeySelectDescConf = + new HashMap>>(); //inputTag->(Child->SelectDesc) + + private final HashSet allReduceSinkOperators = new HashSet(); + + // this set contains all ReduceSink-GroupBy operator-pairs that should be be replaced by GroupBy-ReduceSink-GroupBy pattern. + // the type of first GroupByOperator is hash type and this one will be used to group records. + private final HashSet rSGBYToBeReplacedByGBYRSGBY = new HashSet(); + + public void addToRSGBYToBeReplacedByGBYRSGBY(ReduceSinkOperator rsop) { + rSGBYToBeReplacedByGBYRSGBY.add(rsop); + } + + public HashSet getRSGBYToBeReplacedByGBYRSGBY() { + return rSGBYToBeReplacedByGBYRSGBY; + } + + public void addToAllReduceSinkOperators(ReduceSinkOperator rsop) { + allReduceSinkOperators.add(rsop); + } + + public HashSet getAllReduceSinkOperators() { + return allReduceSinkOperators; + } + + public HashMap>> getDispatchConf() { + return dispatchConf; + } + + public HashMap>> getDispatchValueSelectDescConf() { + return dispatchValueSelectDescConf; + } + + public HashMap>> getDispatchKeySelectDescConf() { + return dispatchKeySelectDescConf; + } + + public void addOperationPathToDispatchConf(Integer opPlan) { + if (!dispatchConf.containsKey(opPlan)) { + dispatchConf.put(opPlan, new HashMap>()); + } + } + + public HashMap> getDispatchConfForOperationPath(Integer opPlan) { + return dispatchConf.get(opPlan); + } + + public void addOperationPathToDispatchValueSelectDescConf(Integer opPlan) { + if (!dispatchValueSelectDescConf.containsKey(opPlan)) { + dispatchValueSelectDescConf.put(opPlan, new HashMap>()); + } + } + + public HashMap> getDispatchValueSelectDescConfForOperationPath(Integer opPlan) { + return dispatchValueSelectDescConf.get(opPlan); + } + + public void addOperationPathToDispatchKeySelectDescConf(Integer opPlan) { + if (!dispatchKeySelectDescConf.containsKey(opPlan)) { + dispatchKeySelectDescConf.put(opPlan, new HashMap>()); + } + } + + public HashMap> getDispatchKeySelectDescConfForOperationPath(Integer opPlan) { + return dispatchKeySelectDescConf.get(opPlan); + } + + private boolean inputCorrelation = false; + private boolean transitCorrelation = false; + private boolean jobFlowCorrelation = false; + + public void setBottomReduceSink2OperationPathMap(HashMap bottomReduceSink2OperationPathMap) { + this.bottomReduceSink2OperationPathMap = bottomReduceSink2OperationPathMap; + } + + public HashMap getBottomReduceSink2OperationPathMap() { + return bottomReduceSink2OperationPathMap; + } + + public void setInputCorrelation(boolean inputCorrelation) { + this.inputCorrelation = inputCorrelation; + } + + public boolean hasInputCorrelation() { + return inputCorrelation; + } + + public void setTransitCorrelation(boolean transitCorrelation) { + this.transitCorrelation = transitCorrelation; + } + + public boolean hasTransitCorrelation() { + return transitCorrelation; + } + + public void setJobFlowCorrelation(boolean jobFlowCorrelation) { + this.jobFlowCorrelation = jobFlowCorrelation; + } + + public boolean hasJobFlowCorrelation() { + return jobFlowCorrelation; + } + + public HashMap> getTop2TSops() { + return top2TSops; + } + + public HashMap> getBottom2TSops() { + return bottom2TSops; + } + + public HashMap> getDown2upRSops() { + return down2upRSops; + } + + public HashMap> getUp2downRSops() { + return up2downRSops; + } + + public void setJFCCorrelation(ArrayList peerReduceSinkOperators, + ArrayList bottomReduceSinkOperators) { + this.topReduceSinkOperators = peerReduceSinkOperators; + this.bottomReduceSinkOperators = bottomReduceSinkOperators; + } + + + public ArrayList getTopReduceSinkOperators() { + return topReduceSinkOperators; + } + + public ArrayList getBottomReduceSinkOperators() { + return bottomReduceSinkOperators; + } + + public void setICandTCCorrelation(HashMap> table2RSops, + HashMap> table2TSops) { + this.table2CorrelatedRSops = table2RSops; + this.table2CorrelatedTSops = table2TSops; + } + + public HashMap> getTable2CorrelatedRSops() { + return table2CorrelatedRSops; + } + + public HashMap> getTable2CorrelatedTSops() { + return table2CorrelatedTSops; + } + + private boolean isInvolveSelfJoin = false; + + public boolean isInvolveSelfJoin() { + return isInvolveSelfJoin; + } + + public void setInvolveSelfJoin(boolean isInvolveSelfJoin) { + this.isInvolveSelfJoin = isInvolveSelfJoin; + } + + } + + private class CorrelationNodeProcCtx implements NodeProcessorCtx { + + private final HashSet walked = new HashSet(); + + private final ArrayList correlations = new ArrayList(); + + public void addCorrelation(IntraQueryCorrelation correlation) { + correlations.add(correlation); + } + + public ArrayList getCorrelations() { + return correlations; + } + + + public boolean isWalked(ReduceSinkOperator op) { + return walked.contains(op); + } + + public void addWalked(ReduceSinkOperator op) { + walked.add(op); + } + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationManualForwardDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationManualForwardDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationManualForwardDesc.java (revision 0) @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; + +/** + * Correlation Manual ForwardDesc. + * + */ +@Explain(displayName = "Correlation Manual Forward") +public class CorrelationManualForwardDesc implements Serializable { + private static final long serialVersionUID = 1L; + + public CorrelationManualForwardDesc() { + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationFakeReduceSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationFakeReduceSinkOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationFakeReduceSinkOperator.java (revision 0) @@ -0,0 +1,397 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CorrelationFakeReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.Serializer; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * Fake Reduce Sink Operator sends output to another operator (e.g. JOIN or GBY). + * CorrelationFakeReduceSinkOperator is used only in reduce phase. + * Basically, it is a bridge from one JOIN or GBY operator to another JOIN or GBY operator. + * A CorrelationFakeReduceSinkOperator will take care actions of startGroup and endGroup of its + * succeeding JOIN or GBY operator. + **/ +//TODO: Let JOIN and GBY operators to take care the startGroup, endGroup, closeOp operations +// of their succeeding JOIN or GBY operators. So CorrelationFakeReduceSinkOperator can be removed. +public class CorrelationFakeReduceSinkOperator extends Operator + implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * The evaluators for the key columns. Key columns decide the sort order on + * the reducer side. Key columns are passed to the reducer in the "key". + */ + protected transient ExprNodeEvaluator[] keyEval; + /** + * The evaluators for the value columns. Value columns are passed to reducer + * in the "value". + */ + protected transient ExprNodeEvaluator[] valueEval; + /** + * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in + * Hive language). Partition columns decide the reducer that the current row + * goes to. Partition columns are not passed to reducer. + */ + + // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is + // ready + transient Serializer keySerializer; + transient boolean keyIsText; + transient Serializer valueSerializer; + + transient TableDesc keyTableDesc; + transient TableDesc valueTableDesc; + + transient Deserializer inputKeyDeserializer; + + transient SerDe inputValueDeserializer; + + transient int tag; + transient byte[] tagByte; + transient ByteWritable tagWritable; + transient protected int numDistributionKeys; + transient protected int numDistinctExprs; + + transient InspectableObject tempInspectableObject; + transient HiveKey keyWritable; + transient Writable value; + + transient StructObjectInspector keyObjectInspector; + transient StructObjectInspector valueObjectInspector; + transient ObjectInspector[] partitionObjectInspectors; + + transient ObjectInspector outputKeyObjectInspector; + transient ObjectInspector outputValueObjectInspector; + transient ObjectInspector[] outputPartitionObjectInspectors; + + + //transient BytesWritable groupKey; + transient Object[][] cachedKeys; + transient Object[] cachedValues; + transient List> distinctColIndices; + + transient Random random; + + private ArrayList forwardedRow; + private Object keyObject; + private Object valueObject; + + private static String[] fieldNames; + + static { + ArrayList fieldNameArray = new ArrayList(); + for (Utilities.ReduceField r : Utilities.ReduceField.values()) { + fieldNameArray.add(r.toString()); + } + fieldNames = fieldNameArray.toArray(new String[0]); + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + forwardedRow = new ArrayList(3); + tagByte = new byte[1]; + tagWritable = new ByteWritable(); + tempInspectableObject = new InspectableObject(); + keyWritable = new HiveKey(); + assert childOperatorsArray.length == 1; + try { + keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()]; + int i = 0; + for (ExprNodeDesc e : conf.getKeyCols()) { + keyEval[i++] = ExprNodeEvaluatorFactory.get(e); + } + + numDistributionKeys = conf.getNumDistributionKeys(); + distinctColIndices = conf.getDistinctColumnIndices(); + numDistinctExprs = distinctColIndices.size(); + + valueEval = new ExprNodeEvaluator[conf.getValueCols().size()]; + i = 0; + for (ExprNodeDesc e : conf.getValueCols()) { + valueEval[i++] = ExprNodeEvaluatorFactory.get(e); + } + + tag = conf.getTag(); + tagByte[0] = (byte) tag; + tagWritable.set(tagByte[0]); + LOG.info("Using tag = " + tag); + + TableDesc keyTableDesc = conf.getKeySerializeInfo(); + keySerializer = (Serializer) keyTableDesc.getDeserializerClass() + .newInstance(); + keySerializer.initialize(null, keyTableDesc.getProperties()); + keyIsText = keySerializer.getSerializedClass().equals(Text.class); + + inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc + .getDeserializerClass(), null); + inputKeyDeserializer.initialize(null, keyTableDesc.getProperties()); + outputKeyObjectInspector = inputKeyDeserializer.getObjectInspector(); + + TableDesc valueTableDesc = conf.getValueSerializeInfo(); + valueSerializer = (Serializer) valueTableDesc.getDeserializerClass() + .newInstance(); + valueSerializer.initialize(null, valueTableDesc.getProperties()); + + inputValueDeserializer = (SerDe) ReflectionUtils.newInstance( + valueTableDesc.getDeserializerClass(), null); + inputValueDeserializer.initialize(null, valueTableDesc + .getProperties()); + outputValueObjectInspector = inputValueDeserializer.getObjectInspector(); + + ObjectInspector rowInspector = inputObjInspectors[0]; + + keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval, + distinctColIndices, + conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector); + valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, conf + .getOutputValueColumnNames(), rowInspector); + int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1; + int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 : + numDistributionKeys; + cachedKeys = new Object[numKeys][keyLen]; + cachedValues = new Object[valueEval.length]; + assert cachedKeys.length == 1; + + ArrayList ois = new ArrayList(); + ois.add(outputKeyObjectInspector); + ois.add(outputValueObjectInspector); + ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector); + + outputObjInspector = ObjectInspectorFactory + .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois); + + LOG.info("Fake ReduceSink inputObjInspectors" + + ((StructObjectInspector) inputObjInspectors[0]).getTypeName()); + + LOG.info("Fake ReduceSink outputObjInspectors " + + this.getChildOperators().get(0).getParentOperators().indexOf(this) + + " " + ((StructObjectInspector) outputObjInspector).getTypeName()); + + initializeChildren(hconf); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + /** + * Initializes array of ExprNodeEvaluator. Adds Union field for distinct + * column indices for group by. + * Puts the return values into a StructObjectInspector with output column + * names. + * + * If distinctColIndices is empty, the object inspector is same as + * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)} + */ + protected static StructObjectInspector initEvaluatorsAndReturnStruct( + ExprNodeEvaluator[] evals, List> distinctColIndices, + List outputColNames, + int length, ObjectInspector rowInspector) + throws HiveException { + int inspectorLen = evals.length > length ? length + 1 : evals.length; + List sois = new ArrayList(inspectorLen); + + // keys + ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, 0, length, rowInspector); + sois.addAll(Arrays.asList(fieldObjectInspectors)); + + if (evals.length > length) { + // union keys + List uois = new ArrayList(); + for (List distinctCols : distinctColIndices) { + List names = new ArrayList(); + List eois = new ArrayList(); + int numExprs = 0; + for (int i : distinctCols) { + names.add(HiveConf.getColumnInternalName(numExprs)); + eois.add(evals[i].initialize(rowInspector)); + numExprs++; + } + uois.add(ObjectInspectorFactory.getStandardStructObjectInspector(names, eois)); + } + UnionObjectInspector uoi = + ObjectInspectorFactory.getStandardUnionObjectInspector(uois); + sois.add(uoi); + } + return ObjectInspectorFactory.getStandardStructObjectInspector(outputColNames, sois ); + } + + private BytesWritable groupKey; + + @Override + public void processOp(Object row, int tag) throws HiveException { + + try { + // Evaluate the value + for (int i = 0; i < valueEval.length; i++) { + cachedValues[i] = valueEval[i].evaluate(row); + } + // Serialize the value + value = valueSerializer.serialize(cachedValues, valueObjectInspector); + valueObject = inputValueDeserializer.deserialize(value); + + // Evaluate the keys + Object[] distributionKeys = new Object[numDistributionKeys]; + for (int i = 0; i < numDistributionKeys; i++) { + distributionKeys[i] = keyEval[i].evaluate(row); + } + + if (numDistinctExprs > 0) { + // with distinct key(s) + for (int i = 0; i < numDistinctExprs; i++) { + System.arraycopy(distributionKeys, 0, cachedKeys[i], 0, numDistributionKeys); + Object[] distinctParameters = + new Object[distinctColIndices.get(i).size()]; + for (int j = 0; j < distinctParameters.length; j++) { + distinctParameters[j] = + keyEval[distinctColIndices.get(i).get(j)].evaluate(row); + } + cachedKeys[i][numDistributionKeys] = + new StandardUnion((byte)i, distinctParameters); + } + } else { + // no distinct key + System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys); + } + + + for (int i = 0; i < cachedKeys.length; i++) { + + if (keyIsText) { + Text key = (Text) keySerializer.serialize(cachedKeys[i], + keyObjectInspector); + keyWritable.set(key.getBytes(), 0, key.getLength()); + + } else { + // Must be BytesWritable + BytesWritable key = (BytesWritable) keySerializer.serialize( + cachedKeys[i], keyObjectInspector); + keyWritable.set(key.getBytes(), 0, key.getLength()); + } + + if (!keyWritable.equals(groupKey)) { + try { + keyObject = inputKeyDeserializer.deserialize(keyWritable); + } catch (Exception e) { + throw new HiveException( + "Hive Runtime Error: Unable to deserialize reduce input key from " + + Utilities.formatBinaryString(keyWritable.get(), 0, + keyWritable.getSize()) + " with properties " + + keyTableDesc.getProperties(), e); + } + if (groupKey == null) { // the first group + groupKey = new BytesWritable(); + } else { + // if its child has not been ended, end it + if(!keyWritable.equals(childOperatorsArray[0].getBytesWritableGroupKey())){ + childOperatorsArray[0].endGroup(); + } + } + groupKey.set(keyWritable.get(), 0, keyWritable.getSize()); + if(!groupKey.equals(childOperatorsArray[0].getBytesWritableGroupKey())){ + childOperatorsArray[0].startGroup(); + childOperatorsArray[0].setGroupKeyObject(keyObject); + childOperatorsArray[0].setBytesWritableGroupKey(groupKey); + } + + } + + forwardedRow.clear(); + forwardedRow.add(keyObject); + forwardedRow.add(valueObject); + forwardedRow.add(tagWritable); + forward(forwardedRow, outputObjInspector); + } + } catch (SerDeException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } + + } + + @Override + public void closeOp(boolean abort) throws HiveException { + if(!abort){ + //if(childOperatorsArray[0].getNumOfClosedParentOperators() == childOperatorsArray[0].getParentOperators().size() - 1){ + if(childOperatorsArray[0].allInitializedParentsAreClosed()) { + childOperatorsArray[0].endGroup(); + } + } + } + + @Override + public void startGroup() throws HiveException { + // do nothing + } + + @Override + public void endGroup() throws HiveException { + // do nothing + } + + @Override + public void setGroupKeyObject(Object keyObject) { + // do nothing + + } + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return new String("CFReduceSink"); + } + + @Override + public OperatorType getType() { + // TODO Auto-generated method stub + return null; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (revision 1205559) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (working copy) @@ -44,7 +44,14 @@ * @param hiveConf */ public void initialize(HiveConf hiveConf) { + CorrelationOptimizer correlationOptimizer = new CorrelationOptimizer(); transformations = new ArrayList(); + // Add correlation optimizer for first phase query plan tree analysis + // THe first phase will record original opColumnExprMap, opParseCtx, opRowResolver, + // since these may be changed during optimization + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION)) { + transformations.add(correlationOptimizer); + } // Add the transformation that computes the lineage information. transformations.add(new Generator()); if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCP)) { @@ -74,6 +81,10 @@ if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) { transformations.add(new ReduceSinkDeDuplication()); } + // Correlation discovery and query plan tree transformation phase + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION)) { + transformations.add(correlationOptimizer); + } } /** Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCountDistinct.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCountDistinct.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCountDistinct.java (revision 0) @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.udf.generic; + +import java.util.Arrays; +import java.util.HashSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.LongWritable; + +//TODO: Need test +/** + * This class implements the COUNT_DISTINCT aggregation function as in SQL. + */ +@Description(name = "count_distinct", + value = "_FUNC_(*) - Returns the number of rows for " + + "which the supplied expression(s) are unique and non-NULL.") + +public class GenericUDAFCountDistinct implements GenericUDAFResolver2 { + + private static final Log LOG = LogFactory.getLog(GenericUDAFCountDistinct.class.getName()); + + @Override + public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) + throws SemanticException { + // This method implementation is preserved for backward compatibility. + return new GenericUDAFCountEvaluator(); + } + + @Override + public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo paramInfo) + throws SemanticException { + + TypeInfo[] parameters = paramInfo.getParameters(); + + if (parameters.length == 0) { + if (!paramInfo.isAllColumns()) { + throw new UDFArgumentException("Argument expected"); + } + assert !paramInfo.isDistinct() : "DISTINCT not supported with *"; + } else { + if (parameters.length > 1 && !paramInfo.isDistinct()) { + throw new UDFArgumentException("DISTINCT keyword must be specified"); + } + assert !paramInfo.isAllColumns() : "* not supported in expression list"; + } + + return new GenericUDAFCountEvaluator().setCountAllColumns( + paramInfo.isAllColumns()); + } + + /** + * GenericUDAFCountEvaluator. + * + */ + public static class GenericUDAFCountEvaluator extends GenericUDAFEvaluator { + private boolean countAllColumns = false; + private LongObjectInspector partialCountAggOI; + private LongWritable result; + + private ObjectInspector[] inputOI; + + @Override + public ObjectInspector init(Mode m, ObjectInspector[] parameters) + throws HiveException { + super.init(m, parameters); + inputOI = new ObjectInspector[parameters.length]; + for(int i=0; i bufferSet = new HashSet(); + } + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + CountDistinctAgg buffer = new CountDistinctAgg(); + reset(buffer); + return buffer; + } + + @Override + public void reset(AggregationBuffer agg) throws HiveException { + ((CountDistinctAgg) agg).value = 0; + ((CountDistinctAgg) agg).bufferSet.clear(); + } + + + @Override + public void iterate(AggregationBuffer agg, Object[] parameters) + throws HiveException { + // parameters == null means the input table/split is empty + if (parameters == null) { + return; + } + if (countAllColumns) { + throw new UDFArgumentException("count_distinct cannot count all columns"); + } else { + assert parameters.length > 0; + boolean countThisRow = true; + Object[] objs = new Object[parameters.length]; + for(int i=0; i>> dispatchConf; + private HashMap>> dispatchValueSelectDescConf; + private HashMap>> dispatchKeySelectDescConf; + + public CorrelationReducerDispatchDesc(){ + this.dispatchConf = new HashMap>>(); + this.dispatchValueSelectDescConf = new HashMap>>(); + this.dispatchKeySelectDescConf = new HashMap>>(); + + } + + public CorrelationReducerDispatchDesc(HashMap>> dispatchConf){ + this.dispatchConf = dispatchConf; + this.dispatchValueSelectDescConf = new HashMap>>(); + this.dispatchKeySelectDescConf = new HashMap>>(); + for(Entry>> entry: this.dispatchConf.entrySet()){ + HashMap> tmp = new HashMap>(); + for(Integer child: entry.getValue().keySet()){ + tmp.put(child, new ArrayList()); + tmp.get(child).add(new SelectDesc(true)); + } + this.dispatchValueSelectDescConf.put(entry.getKey(), tmp); + this.dispatchKeySelectDescConf.put(entry.getKey(), tmp); + } + } + + public CorrelationReducerDispatchDesc(HashMap>> dispatchConf, + HashMap>> dispatchKeySelectDescConf, + HashMap>> dispatchValueSelectDescConf){ + this.dispatchConf = dispatchConf; + this.dispatchValueSelectDescConf = dispatchValueSelectDescConf; + this.dispatchKeySelectDescConf = dispatchKeySelectDescConf; + } + + public void setDispatchConf(HashMap>> dispatchConf){ + this.dispatchConf = dispatchConf; + } + + public HashMap>> getDispatchConf(){ + return this.dispatchConf; + } + + public void setDispatchValueSelectDescConf(HashMap>> dispatchValueSelectDescConf){ + this.dispatchValueSelectDescConf = dispatchValueSelectDescConf; + } + + public HashMap>> getDispatchValueSelectDescConf(){ + return this.dispatchValueSelectDescConf; + } + + public void setDispatchKeySelectDescConf(HashMap>> dispatchKeySelectDescConf){ + this.dispatchKeySelectDescConf = dispatchKeySelectDescConf; + } + + public HashMap>> getDispatchKeySelectDescConf() { + return this.dispatchKeySelectDescConf; + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (revision 1205559) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (working copy) @@ -61,6 +61,7 @@ private Reporter rp; private boolean abort = false; private boolean isTagged = false; + private boolean isOperationPathTagged = false; //If operation path is tagged private long cntr = 0; private long nextCntr = 1; @@ -116,6 +117,7 @@ reducer.setParentOperators(null); // clear out any parents as reducer is the // root isTagged = gWork.getNeedsTagging(); + isOperationPathTagged = gWork.getNeedsOperationPathTagging(); try { keyTableDesc = gWork.getKeyDesc(); inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc @@ -164,8 +166,9 @@ private BytesWritable groupKey; - ArrayList row = new ArrayList(3); + ArrayList row = new ArrayList(4); ByteWritable tag = new ByteWritable(); + ByteWritable operationPathTags = new ByteWritable(); public void reduce(Object key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { @@ -187,6 +190,14 @@ tag.set(keyWritable.get()[size]); keyWritable.setSize(size); } + + operationPathTags.set((byte)0); + if (isOperationPathTagged) { + // remove the operation plan tag + int size = keyWritable.getSize() - 1; + operationPathTags.set(keyWritable.get()[size]); + keyWritable.setSize(size); + } if (!keyWritable.equals(groupKey)) { // If a operator wants to do some work at the beginning of a group @@ -212,6 +223,7 @@ l4j.trace("Start Group"); reducer.startGroup(); reducer.setGroupKeyObject(keyObject); + reducer.setBytesWritableGroupKey(groupKey); } // System.err.print(keyObject.toString()); while (values.hasNext()) { @@ -234,6 +246,7 @@ row.add(valueObject[tag.get()]); // The tag is not used any more, we should remove it. row.add(tag); + row.add(operationPathTags); if (isLogInfoEnabled) { cntr++; if (cntr == nextCntr) { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (revision 1205559) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.OutputCollector; @@ -512,7 +513,7 @@ LOG.debug("End group Done"); } - protected boolean allInitializedParentsAreClosed() { + public boolean allInitializedParentsAreClosed() { if (parentOperators != null) { for (Operator parent : parentOperators) { if(parent==null){ @@ -1335,4 +1336,17 @@ public void cleanUpInputFileChangedOp() throws HiveException { } + protected BytesWritable bytesWritableGroupKey; + + public void setBytesWritableGroupKey(BytesWritable groupKey) { + if (bytesWritableGroupKey == null) { + bytesWritableGroupKey = new BytesWritable(); + } + bytesWritableGroupKey.set(groupKey.get(), 0, groupKey.getSize()); + } + + public BytesWritable getBytesWritableGroupKey() { + return bytesWritableGroupKey; + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java (revision 0) @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.CorrelationManualForwardOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; + + +/** + * Correlation composite operator Descriptor implementation. + * + */ +@Explain(displayName = "Correlation Composite Operator") +public class CorrelationCompositeDesc implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private ReduceSinkOperator correspondingReduceSinkOperator; + + public CorrelationCompositeDesc(){ + + } + + public CorrelationCompositeDesc(ReduceSinkOperator correspondingReduceSinkOperator){ + this.correspondingReduceSinkOperator = correspondingReduceSinkOperator; + } + + public void setCorrespondingReduceSinkOperator( + ReduceSinkOperator correspondingReduceSinkOperator){ + this.correspondingReduceSinkOperator = correspondingReduceSinkOperator; + } + + public ReduceSinkOperator getCorrespondingReduceSinkOperator(){ + return correspondingReduceSinkOperator; + } + + private int[] allOperationPathTags; + private ArrayList> bottomInternalOperators = new ArrayList>(); + private ArrayList correlationManualForwardOperators = new ArrayList();; + + public void setCorrelationManualForwardOperators( + ArrayList correlationManualForwardOperators){ + this.correlationManualForwardOperators = correlationManualForwardOperators; + } + + public ArrayList getCorrelationManualForwardOperators(){ + return correlationManualForwardOperators; + } + + public void setBottomInternalOperators(ArrayList> bottomInternalOperators){ + this.bottomInternalOperators = bottomInternalOperators; + } + + public ArrayList> getBottomInternalOperators(){ + return bottomInternalOperators; + } + + public void setAllOperationPathTags(int[] allOperationPathTags){ + this.allOperationPathTags = allOperationPathTags; + } + + public int[] getAllOperationPathTags(){ + return allOperationPathTags; + } + + + public void setInternalNodes(ArrayList> bottomInternalOperators, + ArrayList> topInternalOperators, int[] allOperationPathTags){ + this.allOperationPathTags = allOperationPathTags; + // the size of bottomInternalOperators and topInternalOperators should be same. + this.bottomInternalOperators.addAll(bottomInternalOperators); + for (Operator op: topInternalOperators){ + CorrelationManualForwardOperator cmfo = new CorrelationManualForwardOperator(); + cmfo.setChildOperators(null); + correlationManualForwardOperators.add(cmfo); + List> newChildren = + new ArrayList>(); + newChildren.add(cmfo); + op.setChildOperators(newChildren); + cmfo.setParentOperators(Utilities.makeList(op)); + } + } +}