diff --git ql/if/queryplan.thrift ql/if/queryplan.thrift index eafbe5a..dbf35e6 100644 --- ql/if/queryplan.thrift +++ ql/if/queryplan.thrift @@ -56,6 +56,7 @@ enum OperatorType { PTF, MUX, DEMUX, + MERGEJOIN, } struct Operator { diff --git ql/src/gen/thrift/gen-cpp/queryplan_types.cpp ql/src/gen/thrift/gen-cpp/queryplan_types.cpp index 96dbb29..34e9007 100644 --- ql/src/gen/thrift/gen-cpp/queryplan_types.cpp +++ ql/src/gen/thrift/gen-cpp/queryplan_types.cpp @@ -51,7 +51,8 @@ int _kOperatorTypeValues[] = { OperatorType::HASHTABLEDUMMY, OperatorType::PTF, OperatorType::MUX, - OperatorType::DEMUX + OperatorType::DEMUX, + OperatorType::MERGEJOIN }; const char* _kOperatorTypeNames[] = { "JOIN", @@ -74,9 +75,10 @@ const char* _kOperatorTypeNames[] = { "HASHTABLEDUMMY", "PTF", "MUX", - "DEMUX" + "DEMUX", + "MERGEJOIN" }; -const std::map _OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(21, _kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +const std::map _OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(22, _kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); int _kTaskTypeValues[] = { TaskType::MAP, diff --git ql/src/gen/thrift/gen-cpp/queryplan_types.h ql/src/gen/thrift/gen-cpp/queryplan_types.h index 634dd55..71b5c88 100644 --- ql/src/gen/thrift/gen-cpp/queryplan_types.h +++ ql/src/gen/thrift/gen-cpp/queryplan_types.h @@ -56,7 +56,8 @@ struct OperatorType { HASHTABLEDUMMY = 17, PTF = 18, MUX = 19, - DEMUX = 20 + DEMUX = 20, + MERGEJOIN = 21 }; }; diff --git ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java index aa094ee..b286e06 100644 --- ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java +++ ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java @@ -32,7 +32,8 @@ HASHTABLEDUMMY(17), PTF(18), MUX(19), - DEMUX(20); + DEMUX(20), + MERGEJOIN(21); private final int value; @@ -95,6 +96,8 @@ public static OperatorType findByValue(int value) { return MUX; case 20: return DEMUX; + case 21: + return MERGEJOIN; default: return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java index 8c1067e..cbb1786 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -61,29 +62,32 @@ public AbstractMapJoinOperator(AbstractMapJoinOperator mj @Override @SuppressWarnings("unchecked") protected void initializeOp(Configuration hconf) throws HiveException { - int tagLen = conf.getTagLength(); - - joinKeys = new List[tagLen]; - - JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), NOTSKIPBIGTABLE); - joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinKeys, - inputObjInspectors,NOTSKIPBIGTABLE, tagLen); + boolean mapsideJoin = + (conf instanceof CommonMergeJoinDesc) && ((CommonMergeJoinDesc) conf).getGenJoinKeys(); + if (mapsideJoin) { + int tagLen = conf.getTagLength(); + joinKeys = new List[tagLen]; + JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), NOTSKIPBIGTABLE); + joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinKeys, + inputObjInspectors,NOTSKIPBIGTABLE, tagLen); + } super.initializeOp(hconf); - numMapRowsRead = 0; - firstRow = true; + if (mapsideJoin) { + numMapRowsRead = 0; + firstRow = true; - // all other tables are small, and are cached in the hash table - posBigTable = (byte) conf.getPosBigTable(); + // all other tables are small, and are cached in the hash table + posBigTable = (byte) conf.getPosBigTable(); - emptyList = new RowContainer>(1, hconf, reporter); + emptyList = new RowContainer>(1, hconf, reporter); - RowContainer> bigPosRC = JoinUtil.getRowContainer(hconf, - rowContainerStandardObjectInspectors[posBigTable], - posBigTable, joinCacheSize,spillTableDesc, conf, - !hasFilter(posBigTable), reporter); - storage[posBigTable] = bigPosRC; + RowContainer> bigPosRC = + JoinUtil.getRowContainer(hconf, rowContainerStandardObjectInspectors[posBigTable], + posBigTable, joinCacheSize, spillTableDesc, conf, !hasFilter(posBigTable), reporter); + storage[posBigTable] = bigPosRC; + } initializeChildren(hconf); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java index 03194a4..9872741 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java @@ -217,7 +217,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { joinFilters = new List[tagLen]; JoinUtil.populateJoinKeyValue(joinFilters, conf.getFilters(),order,NOTSKIPBIGTABLE); - joinValuesObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinValues, inputObjInspectors,NOTSKIPBIGTABLE, tagLen); joinFilterObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinFilters, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java new file mode 100644 index 0000000..1a7c850 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java @@ -0,0 +1,493 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; + +public class CommonMergeJoinOperator extends AbstractMapJoinOperator implements + Serializable { + + private static final long serialVersionUID = 1L; + JoinOperator joinOp; + private boolean isBigTableWork; + private static final Log LOG = LogFactory.getLog(CommonMergeJoinOperator.class.getName()); + private Map aliasToInputNameMap; + Map> aliasToKeyMap = new HashMap>(); + private static int count = 0; + transient List[] keyWritables; + transient List[] nextKeyWritables; + RowContainer>[] nextGroupStorage; + RowContainer>[] candidateStorage; + + transient String[] tagToAlias; + private transient boolean[] fetchDone; + private transient boolean[] foundNextKeyGroup; + transient boolean firstFetchHappened = false; + transient boolean localWorkInited = false; + transient boolean initDone = false; + transient List otherKey = null; + transient List values = null; + + public CommonMergeJoinOperator() { + this.joinOp = null; + } + + public CommonMergeJoinOperator(JoinOperator joinOp) { + super(); + this.joinOp = joinOp; + } + + @Override + public void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); + int maxAlias = 0; + for (byte pos = 0; pos < order.length; pos++) { + if (pos > maxAlias) { + maxAlias = pos; + } + } + maxAlias += 1; + + nextGroupStorage = new RowContainer[maxAlias]; + candidateStorage = new RowContainer[maxAlias]; + keyWritables = new ArrayList[maxAlias]; + nextKeyWritables = new ArrayList[maxAlias]; + fetchDone = new boolean[maxAlias]; + foundNextKeyGroup = new boolean[maxAlias]; + + int bucketSize; + + int oldVar = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINBUCKETCACHESIZE); + if (oldVar != 100) { + bucketSize = oldVar; + } else { + bucketSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVESMBJOINCACHEROWS); + } + + for (byte pos = 0; pos < order.length; pos++) { + RowContainer> rc = + JoinUtil.getRowContainer(hconf, rowContainerStandardObjectInspectors[pos], pos, + bucketSize, spillTableDesc, conf, !hasFilter(pos), reporter); + nextGroupStorage[pos] = rc; + RowContainer> candidateRC = + JoinUtil.getRowContainer(hconf, rowContainerStandardObjectInspectors[pos], pos, + bucketSize, spillTableDesc, conf, !hasFilter(pos), reporter); + candidateStorage[pos] = candidateRC; + } + + for (byte pos = 0; pos < order.length; pos++) { + if (pos != posBigTable) { + fetchDone[pos] = false; + } + foundNextKeyGroup[pos] = false; + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hive.ql.exec.Operator#processOp(java.lang.Object, + * int) this processor has a push-pull model. First call to this method is a + * push but the rest is pulled until we run out of records. + */ + @Override + public void processOp(Object row, int tag) throws HiveException { + + byte alias = (byte) tag; + + if (!firstFetchHappened) { + firstFetchHappened = true; + // fetch the first group for all small table aliases + for (byte pos = 0; pos < order.length; pos++) { + if (pos != posBigTable) { + fetchNextGroup(pos); + } + } + } + + // compute keys and values as StandardObjects + List key = mergeJoinComputeKeys(row, alias); + + List value = getFilteredValue(alias, row); + + + //have we reached a new key group? + boolean nextKeyGroup = processKey(alias, key); + LOG.info("Next key group? " + nextKeyGroup); + if (nextKeyGroup) { + //assert this.nextGroupStorage[alias].size() == 0; + this.nextGroupStorage[alias].addRow(value); + foundNextKeyGroup[tag] = true; + if (tag != posBigTable) { + LOG.info("tag " + tag + " != posBigTable " + posBigTable); + return; + } + } + + reportProgress(); + numMapRowsRead++; + + // the big table has reached a new key group. try to let the small tables + // catch up with the big table. + if (nextKeyGroup) { + assert tag == posBigTable; + List smallestPos = null; + do { + smallestPos = joinOneGroup(); + //jump out the loop if we need input from the big table + } while (smallestPos != null && smallestPos.size() > 0 + && !smallestPos.contains(this.posBigTable)); + + return; + } + + assert !nextKeyGroup; + candidateStorage[tag].addRow(value); + + } + + private List joinOneGroup() throws HiveException { + int[] smallestPos = findSmallestKey(); + List listOfNeedFetchNext = null; + if (smallestPos != null) { + listOfNeedFetchNext = joinObject(smallestPos); + if (listOfNeedFetchNext.size() > 0) { + // listOfNeedFetchNext contains all tables that we have joined data in their + // candidateStorage, and we need to clear candidate storage and promote their + // nextGroupStorage to candidateStorage and fetch data until we reach a + // new group. + for (Byte b : listOfNeedFetchNext) { + try { + fetchNextGroup(b); + } catch (Exception e) { + throw new HiveException(e); + } + } + } + } + return listOfNeedFetchNext; + } + + private List joinObject(int[] smallestPos) throws HiveException { + List needFetchList = new ArrayList(); + byte index = (byte) (smallestPos.length - 1); + for (; index >= 0; index--) { + if (smallestPos[index] > 0 || keyWritables[index] == null) { + putDummyOrEmpty(index); + continue; + } + storage[index] = candidateStorage[index]; + needFetchList.add(index); + if (smallestPos[index] < 0) { + break; + } + } + for (index--; index >= 0; index--) { + putDummyOrEmpty(index); + } + checkAndGenObject(); + for (Byte pos : needFetchList) { + this.candidateStorage[pos].clearRows(); + this.keyWritables[pos] = null; + } + return needFetchList; + } + + private void putDummyOrEmpty(Byte i) { + // put a empty list or null + if (noOuterJoin) { + storage[i] = emptyList; + } else { + storage[i] = dummyObjVectors[i]; + } + } + + private int[] findSmallestKey() { + int[] result = new int[order.length]; + List smallestOne = null; + + for (byte pos = 0; pos < order.length; pos++) { + List key = keyWritables[pos]; + if (key == null) { + continue; + } + if (smallestOne == null) { + smallestOne = key; + result[pos] = -1; + continue; + } + result[pos] = compareKeys(key, smallestOne); + if (result[pos] < 0) { + smallestOne = key; + } + } + return smallestOne == null ? null : result; + } + + private void fetchNextGroup(Byte t) throws HiveException { + LOG.info("Fetching next group for alias: " + t); + if (foundNextKeyGroup[t]) { + // first promote the next group to be the current group if we reached a + // new group in the previous fetch + if (this.nextKeyWritables[t] != null) { + promoteNextGroupToCandidate(t); + } else { + LOG.info("Key writables being nulled " + t); + this.keyWritables[t] = null; + this.candidateStorage[t] = null; + this.nextGroupStorage[t] = null; + } + foundNextKeyGroup[t] = false; + } + // for the big table, we only need to promote the next group to the current group. + if (t == posBigTable) { + return; + } + + // for tables other than the big table, we need to fetch more data until reach a new group or + // done. + while (!foundNextKeyGroup[t]) { + if (fetchDone[t]) { + break; + } + fetchOneRow(t); + } + if (!foundNextKeyGroup[t] && fetchDone[t]) { + this.nextKeyWritables[t] = null; + } + } + + @Override + public void closeOp(boolean abort) throws HiveException { + joinFinalLeftData(); + + // clean up + for (int pos = 0; pos < order.length; pos++) { + if (pos != posBigTable) { + fetchDone[pos] = false; + } + foundNextKeyGroup[pos] = false; + } + } + + private void fetchOneRow(byte tag) throws HiveException { + LOG.info("Dumping alias to input name map: " + aliasToInputNameMap.toString() + " tag: " + tag + + " input name: " + aliasToInputNameMap.get(Integer.valueOf(tag)) + " int tag: " + + Integer.valueOf(tag)); + try { + Object rowOfSide = + getParentOperators().get(tag).getNextRow(aliasToInputNameMap.get(Integer.valueOf(tag))); + InspectableObject inspectableObj = (InspectableObject) rowOfSide; + this.processOp(inspectableObj.o, tag); + } catch (Exception e) { + throw new HiveException(e); + } + } + + private void joinFinalLeftData() throws HiveException { + LOG.info("Join final left data"); + RowContainer bigTblRowContainer = this.candidateStorage[this.posBigTable]; + + boolean allFetchDone = allFetchDone(); + // if all left data in small tables are less than and equal to the left data + // in big table, let's them catch up + while (bigTblRowContainer != null && bigTblRowContainer.rowCount() > 0 && !allFetchDone) { + joinOneGroup(); + bigTblRowContainer = this.candidateStorage[this.posBigTable]; + allFetchDone = allFetchDone(); + } + + while (!allFetchDone) { + List ret = joinOneGroup(); + if (ret == null || ret.size() == 0) { + break; + } + reportProgress(); + numMapRowsRead++; + allFetchDone = allFetchDone(); + } + + boolean dataInCache = true; + while (dataInCache) { + for (byte pos = 0; pos < order.length; pos++) { + if (this.foundNextKeyGroup[pos] && this.nextKeyWritables[pos] != null) { + promoteNextGroupToCandidate(pos); + } + } + joinOneGroup(); + dataInCache = false; + for (byte pos = 0; pos < order.length; pos++) { + if (this.candidateStorage[pos].rowCount() > 0) { + dataInCache = true; + break; + } + } + } + } + + private boolean allFetchDone() { + boolean allFetchDone = true; + for (byte pos = 0; pos < order.length; pos++) { + if (pos == posBigTable) { + continue; + } + allFetchDone = allFetchDone && fetchDone[pos]; + } + return allFetchDone; + } + + private void promoteNextGroupToCandidate(Byte t) throws HiveException { + LOG.info("Setting the key writables for the next key for alias " + t); + this.keyWritables[t] = this.nextKeyWritables[t]; + this.nextKeyWritables[t] = null; + RowContainer> oldRowContainer = this.candidateStorage[t]; + oldRowContainer.clearRows(); + this.candidateStorage[t] = this.nextGroupStorage[t]; + this.nextGroupStorage[t] = oldRowContainer; + } + + private boolean processKey(byte alias, List key) throws HiveException { + List keyWritable = keyWritables[alias]; + LOG.info("Key writable: " + keyWritable); + if (keyWritable == null) { + // the first group. + LOG.info("Key writables being set to " + key + " for alias: " + alias); + keyWritables[alias] = key; + return false; + } else { + int cmp = compareKeys(key, keyWritable); + LOG.info("Comparing keys: " + key + " key writable: " + keyWritable + " cmp: " + cmp); + if (cmp != 0) { + nextKeyWritables[alias] = key; + return true; + } + return false; + } + } + + private int compareKeys(List k1, List k2) { + int ret = 0; + + // join keys have difference sizes? + ret = k1.size() - k2.size(); + if (ret != 0) { + return ret; + } + + for (int i = 0; i < k1.size(); i++) { + WritableComparable key_1 = (WritableComparable) k1.get(i); + WritableComparable key_2 = (WritableComparable) k2.get(i); + if (key_1 == null && key_2 == null) { + return nullsafes != null && nullsafes[i] ? 0 : -1; // just return k1 is + // smaller than k2 + } else if (key_1 == null) { + return -1; + } else if (key_2 == null) { + return 1; + } + ret = WritableComparator.get(key_1.getClass()).compare(key_1, key_2); + if (ret != 0) { + return ret; + } + } + return ret; + } + + private List mergeJoinComputeKeys(Object row, Byte alias) throws HiveException { + return JoinUtil.computeKeys(row, joinKeys[alias], joinKeysObjectInspectors[alias]); + } + + @Override + public String getName() { + return getOperatorName(); + } + + static public String getOperatorName() { + return "MERGEJOIN"; + } + + @Override + public OperatorType getType() { + return OperatorType.MERGEJOIN; + } + + @Override + public void initializeLocalWork(Configuration hconf) throws HiveException { + Operator parent = null; + if (count != 0) { + throw new HiveException("2nd count"); + } + count++; + for (Operator parentOp : parentOperators) { + if (parentOp != null) { + parent = parentOp; + break; + } + } + + if (parent == null) { + throw new HiveException("No valid parents."); + } + Map dummyOps = parent.getConnectOps(); + for (Entry connectOp : dummyOps.entrySet()) { + parentOperators.add(connectOp.getValue(), connectOp.getKey()); + connectOp.getKey().getChildOperators().add(this); + } + super.initializeLocalWork(hconf); + return; + } + + public void setJoinOp(JoinOperator joinOp) { + this.joinOp = joinOp; + } + + public boolean isBigTableWork() { + return isBigTableWork; + } + + public void setIsBigTableWork(boolean bigTableWork) { + this.isBigTableWork = bigTableWork; + } + + public Map getAliasToInputNameMap() { + return aliasToInputNameMap; + } + + public void setAliasToInputNameMap(Map aliasToInputNameMap) { + this.aliasToInputNameMap = aliasToInputNameMap; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java index b8f5227..b2e6478 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java @@ -74,11 +74,11 @@ public DummyStoreOperator() { @Override protected void initializeOp(Configuration hconf) throws HiveException { /* - * The conversion to standard object inspector was necessitated by HIVE-5973. The issue - * happens when a select operator preceeds this operator as in the case of a subquery. The - * select operator does not allocate a new object to hold the deserialized row. This affects + * The conversion to standard object inspector was necessitated by HIVE-5973. The issue + * happens when a select operator preceeds this operator as in the case of a subquery. The + * select operator does not allocate a new object to hold the deserialized row. This affects * the operation of the SMB join which puts the object in a priority queue. Since all elements - * of the priority queue point to the same object, the join was resulting in incorrect + * of the priority queue point to the same object, the join was resulting in incorrect * results. * * So the fix is to make a copy of the object as done in the processOp phase below. This @@ -95,6 +95,7 @@ public void processOp(Object row, int tag) throws HiveException { // Store the row. See comments above for why we need a new copy of the row. result.o = ObjectInspectorUtils.copyToStandardObject(row, inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE); + LOG.info("We are in the process op of dummy store. result is: " + result); } @Override @@ -110,4 +111,15 @@ public InspectableObject getResult() { public OperatorType getType() { return OperatorType.FORWARD; } + + @Override + public Object getNextRow(String string) throws Exception { + // all operators except the MapOperator just fetch and process the row. + assert (getParentOperators().size() == 1); + // this call ensures that we compute a result. + if (getParentOperators().get(0).getNextRow(string) == null) { + return null; + } + return result; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index d5de58e..d8fd041 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -33,8 +33,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor; +import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -180,7 +181,7 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, PartitionDesc pd = ctx.partDesc; TableDesc td = pd.getTableDesc(); - + MapOpCtx opCtx = new MapOpCtx(); // Use table properties in case of unpartitioned tables, // and the union of table properties and partition properties, with partition @@ -204,42 +205,42 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, opCtx.partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter( partRawRowObjectInspector, opCtx.tblRawRowObjectInspector); - + // Next check if this table has partitions and if so // get the list of partition names as well as allocate // the serdes for the partition columns String pcols = overlayedProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); - + if (pcols != null && pcols.length() > 0) { String[] partKeys = pcols.trim().split("/"); String pcolTypes = overlayedProps .getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES); String[] partKeyTypes = pcolTypes.trim().split(":"); - + if (partKeys.length > partKeyTypes.length) { throw new HiveException("Internal error : partKeys length, " +partKeys.length + " greater than partKeyTypes length, " + partKeyTypes.length); } - + List partNames = new ArrayList(partKeys.length); Object[] partValues = new Object[partKeys.length]; List partObjectInspectors = new ArrayList(partKeys.length); - + for (int i = 0; i < partKeys.length; i++) { String key = partKeys[i]; partNames.add(key); ObjectInspector oi = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector (TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i])); - + // Partitions do not exist for this table if (partSpec == null) { // for partitionless table, initialize partValue to null partValues[i] = null; } else { - partValues[i] = + partValues[i] = ObjectInspectorConverters. getConverter(PrimitiveObjectInspectorFactory. - javaStringObjectInspector, oi).convert(partSpec.get(key)); + javaStringObjectInspector, oi).convert(partSpec.get(key)); } partObjectInspectors.add(oi); } @@ -340,6 +341,55 @@ private boolean isPartitioned(PartitionDesc pd) { return pd.getPartSpec() != null && !pd.getPartSpec().isEmpty(); } + public void setChildrenSecondary(Configuration hconf) throws HiveException { + + List> children = + new ArrayList>(); + + Map convertedOI = getConvertedOI(hconf); + try { + for (Map.Entry> entry : conf.getPathToAliases().entrySet()) { + String onefile = entry.getKey(); + List aliases = entry.getValue(); + + Path onepath = new Path(onefile); + + PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile); + + for (String onealias : aliases) { + Operator op = conf.getAliasToWork().get(onealias); + MapInputPath inp = new MapInputPath(onefile, onealias, op, partDesc); + if (opCtxMap.containsKey(inp)) { + continue; + } + MapOpCtx opCtx = initObjectInspector(hconf, inp, convertedOI); + opCtxMap.put(inp, opCtx); + + op.setParentOperators(new ArrayList>()); + op.getParentOperators().add(this); + // check for the operators who will process rows coming to this Map + // Operator + children.add(op); + childrenOpToOpCtxMap.put(op, opCtx); + LOG.info("dump " + op + " " + + opCtxMap.get(inp).rowObjectInspector.getTypeName()); + current = opCtx; // just need for TestOperators.testMapOperator + } + } + + if (children.size() == 0) { + // didn't find match for input file path in configuration! + // serious problem .. + throw new HiveException("Unable to set up children"); + } + + // we found all the operators that we are supposed to process. + setChildOperators(children); + } catch (Exception e) { + throw new HiveException(e); + } + } + public void setChildren(Configuration hconf) throws HiveException { Path fpath = IOContext.get().getInputPath(); @@ -350,9 +400,10 @@ public void setChildren(Configuration hconf) throws HiveException { new ArrayList>(); Map convertedOI = getConvertedOI(hconf); - try { for (Map.Entry> entry : conf.getPathToAliases().entrySet()) { + LOG.info("Path to aliases: " + entry.getKey() + " values: " + + Arrays.toString(entry.getValue().toArray())); String onefile = entry.getKey(); List aliases = entry.getValue(); @@ -366,7 +417,7 @@ public void setChildren(Configuration hconf) throws HiveException { for (String onealias : aliases) { Operator op = conf.getAliasToWork().get(onealias); if (LOG.isDebugEnabled()) { - LOG.debug("Adding alias " + onealias + " to work list for file " + LOG.info("Adding alias " + onealias + " to work list for file " + onefile); } MapInputPath inp = new MapInputPath(onefile, onealias, op, partDesc); @@ -625,4 +676,23 @@ public OperatorType getType() { return null; } + @Override + public Object getNextRow(String inputName) throws Exception { + // This map operator definitely belongs to merge work. + Object nextRow = MapRecordProcessor.getNextRow(inputName); + ExecMapperContext context = getExecContext(); + boolean resetValue = context.inputFileChanged(); + context.setSkipFileChangedCheck(true); + if (nextRow == null) { + return nextRow; + } + process((Writable) nextRow); + context.setSkipFileChangedCheck(resetValue); + return nextRow; + } + + @Override + public Map getConnectOps() { + return MapRecordProcessor.getConnectOps(); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index db94271..ff9bea6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -146,6 +146,7 @@ public int getNumChild() { /** * Implements the getChildren function for the Node Interface. */ + @Override public ArrayList getChildren() { if (getChildOperators() == null) { @@ -309,10 +310,13 @@ protected boolean areAllParentsInitialized() { //return true; continue; } + LOG.info("Parent being checked for initialization: " + parent); if (parent.state != State.INIT) { + LOG.info("Parent state is not initialized: " + parent); return false; } } + LOG.info("Parents are initialized"); return true; } @@ -335,6 +339,7 @@ public void initialize(Configuration hconf, ObjectInspector[] inputOIs) this.configuration = hconf; if (!areAllParentsInitialized()) { + LOG.info("Not all parents initialized"); return; } @@ -377,6 +382,7 @@ public void initialize(Configuration hconf, ObjectInspector[] inputOIs) //pass the exec context to child operators passExecContext(this.execContext); + LOG.info("Initialize the operator - " + getName()); initializeOp(hconf); // sanity check @@ -450,6 +456,8 @@ public void passExecContext(ExecMapperContext execContext) { */ protected void initialize(Configuration hconf, ObjectInspector inputOI, int parentId) throws HiveException { + LOG.info("id is " + id); + LOG.info("name is " + getName()); LOG.info("Initializing child " + id + " " + getName()); // Double the size of the array if needed if (parentId >= inputObjInspectors.length) { @@ -851,6 +859,7 @@ public void logStats() { * * @return the name of the operator */ + @Override public String getName() { return getOperatorName(); } @@ -1061,7 +1070,7 @@ public boolean supportSkewJoinOptimization() { if (parents != null) { for (Operator parent : parents) { - parentClones.add((Operator)(parent.clone())); + parentClones.add((parent.clone())); } } @@ -1082,8 +1091,8 @@ public boolean supportSkewJoinOptimization() { public Operator cloneOp() throws CloneNotSupportedException { T descClone = (T) conf.clone(); Operator ret = - (Operator) OperatorFactory.getAndMakeChild( - descClone, getSchema()); + OperatorFactory.getAndMakeChild( + descClone, getSchema()); return ret; } @@ -1254,15 +1263,15 @@ public Statistics getStatistics() { } return null; } - + public OpTraits getOpTraits() { if (conf != null) { return conf.getOpTraits(); } - + return null; } - + public void setOpTraits(OpTraits metaInfo) { if (LOG.isDebugEnabled()) { LOG.debug("Setting traits ("+metaInfo+") on "+this); @@ -1299,7 +1308,24 @@ public static Operator createDummy() { private static class DummyOperator extends Operator { public DummyOperator() { super("dummy"); } + @Override public void processOp(Object row, int tag) { } + @Override public OperatorType getType() { return null; } } + + public Object getNextRow(String string) throws Exception { + // all operators except the MapOperator just fetch and process the row. + assert (getParentOperators().size() == 1); + Object row = getParentOperators().get(0).getNextRow(string); + return row; + } + + public Map getConnectOps() { + if ((parentOperators == null) || (parentOperators.size() == 0)) { + return null; + } + Map dummyOps = parentOperators.get(0).getConnectOps(); + return dummyOps; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 487bb33..958315f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -84,6 +84,7 @@ private transient boolean convertedAutomaticallySMBJoin = false; public SMBMapJoinOperator() { + super(); } public SMBMapJoinOperator(AbstractMapJoinOperator mapJoinOp) { @@ -226,7 +227,7 @@ private byte tagForAlias(String alias) { public void cleanUpInputFileChangedOp() throws HiveException { inputFileChanged = true; } - + protected List smbJoinComputeKeys(Object row, byte alias) throws HiveException { return JoinUtil.computeKeys(row, joinKeys[alias], joinKeysObjectInspectors[alias]); @@ -265,8 +266,8 @@ public void processOp(Object row, int tag) throws HiveException { byte alias = (byte) tag; // compute keys and values as StandardObjects - List key = smbJoinComputeKeys(row, alias); - + List key = smbJoinComputeKeys(row, alias); + List value = getFilteredValue(alias, row); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 4450ad3..99730d3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -141,6 +141,7 @@ import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; @@ -197,6 +198,7 @@ public static String HADOOP_LOCAL_FS = "file:///"; public static String MAP_PLAN_NAME = "map.xml"; public static String REDUCE_PLAN_NAME = "reduce.xml"; + public static String MERGE_PLAN_NAME = "merge.xml"; public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class"; public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class"; @@ -330,7 +332,7 @@ private static BaseWork getBaseWork(Configuration conf, String name) { } if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) { - LOG.debug("Loading plan from string: "+path.toUri().getPath()); + LOG.info("Loading plan from string: " + path.toUri().getPath()); String planString = conf.get(path.toUri().getPath()); if (planString == null) { LOG.info("Could not find plan string in conf"); @@ -343,7 +345,7 @@ private static BaseWork getBaseWork(Configuration conf, String name) { in = new FileInputStream(localPath.toUri().getPath()); } - if(MAP_PLAN_NAME.equals(name)){ + if (MAP_PLAN_NAME.equals(name)) { if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){ gWork = deserializePlan(in, MapWork.class, conf); } else if(RCFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { @@ -363,6 +365,8 @@ private static BaseWork getBaseWork(Configuration conf, String name) { throw new RuntimeException("unable to determine work from configuration ." + MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)) ; } + } else if (MERGE_PLAN_NAME.equals(name)) { + gWork = deserializePlan(in, MergeJoinWork.class, conf); } gWorkMap.put(path, gWork); } else { @@ -593,7 +597,8 @@ public static Path setReduceWork(Configuration conf, ReduceWork w, Path hiveScra return setBaseWork(conf, w, hiveScratchDir, REDUCE_PLAN_NAME, useCache); } - private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratchDir, String name, boolean useCache) { + private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratchDir, + String name, boolean useCache) { try { setPlanPath(conf, hiveScratchDir); @@ -605,6 +610,7 @@ private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratch // add it to the conf ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); out = new DeflaterOutputStream(byteOut, new Deflater(Deflater.BEST_SPEED)); + serializePlan(w, out, conf); LOG.info("Setting plan: "+planPath.toUri().getPath()); conf.set(planPath.toUri().getPath(), @@ -3424,7 +3430,7 @@ public static boolean createDirsWithPermission(Configuration conf, Path mkdir, return createDirsWithPermission(conf, mkdir, fsPermission, recursive); } - private static void resetConfAndCloseFS (Configuration conf, boolean unsetUmask, + private static void resetConfAndCloseFS (Configuration conf, boolean unsetUmask, String origUmask, FileSystem fs) throws IOException { if (unsetUmask) { if (origUmask != null) { @@ -3499,4 +3505,13 @@ public static String getQualifiedPath(HiveConf conf, Path path) throws HiveExcep public static boolean isDefaultNameNode(HiveConf conf) { return !conf.getChangedProperties().containsKey(HiveConf.ConfVars.HADOOPFS.varname); } + + public static Path setMergeWork(JobConf conf, MergeJoinWork mergeJoinWork, Path mrScratchDir, + boolean b) { + return setBaseWork(conf, mergeJoinWork, mrScratchDir, MERGE_PLAN_NAME, b); + } + + public static MergeJoinWork getMergeWork(JobConf jconf, Class clazz) { + return (MergeJoinWork) getBaseWork(jconf, MERGE_PLAN_NAME); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java index 74bc2d2..6dc9fef 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java @@ -52,6 +52,8 @@ private String currentBigBucketFile=null; + private boolean skipFileChangedCheck; + public String getCurrentBigBucketFile() { return currentBigBucketFile; } @@ -80,6 +82,9 @@ public void clear() { * @return is the input file changed? */ public boolean inputFileChanged() { + if (skipFileChangedCheck) { + return false; + } if (!inputFileChecked) { currentInputPath = this.ioCxt.getInputPath(); inputFileChecked = true; @@ -153,4 +158,8 @@ public IOContext getIoCxt() { public void setIoCxt(IOContext ioCxt) { this.ioCxt = ioCxt; } + + public void setSkipFileChangedCheck(boolean skipFileChangedCheck) { + this.skipFileChangedCheck = skipFileChangedCheck; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java index 407d8ac..edb716e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java @@ -31,7 +31,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.plan.BaseWork.VertexType; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.serializer.SerializationFactory; import org.apache.hadoop.mapred.FileSplit; @@ -46,7 +48,6 @@ import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.VertexManagerPlugin; import org.apache.tez.dag.api.VertexManagerPluginContext; -import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto; import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; @@ -78,9 +79,10 @@ private List dataInformationEvents; private int numBuckets = -1; private Configuration conf = null; - private boolean rootVertexInitialized = false; private final SplitGrouper grouper = new SplitGrouper(); private int taskCount = 0; + private VertexType vertexType; + private String inputNameDecidingParallelism; public CustomPartitionVertex(VertexManagerPluginContext context) { super(context); @@ -89,14 +91,22 @@ public CustomPartitionVertex(VertexManagerPluginContext context) { @Override public void initialize() { this.context = getContext(); - ByteBuffer byteBuf = ByteBuffer.wrap(context.getUserPayload()); - this.numBuckets = byteBuf.getInt(); + byte[] payload = context.getUserPayload(); + CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(); + DataInputBuffer dib = new DataInputBuffer(); + dib.reset(payload, payload.length); + try { + vertexConf.readFields(dib); + } catch (IOException e) { + throw new RuntimeException(e); + } + this.numBuckets = vertexConf.getNumBuckets(); } @Override public void onVertexStarted(Map> completions) { int numTasks = context.getVertexNumTasks(context.getVertexName()); - List scheduledTasks = + List scheduledTasks = new ArrayList(numTasks); for (int i = 0; i < numTasks; ++i) { scheduledTasks.add(new VertexManagerPluginContext.TaskWithLocationHint(new Integer(i), null)); @@ -116,13 +126,8 @@ public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) { @Override public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List events) { + LOG.info("On root vertex initialized"); - // Ideally, since there's only 1 Input expected at the moment - - // ensure this method is called only once. Tez will call it once per Root - // Input. - Preconditions.checkState(rootVertexInitialized == false); - LOG.info("Root vertex not initialized"); - rootVertexInitialized = true; try { // This is using the payload from the RootVertexInitializer corresponding // to InputName. Ideally it should be using it's own configuration class - @@ -166,9 +171,6 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr // No tasks should have been started yet. Checked by initial state // check. Preconditions.checkState(dataInformationEventSeen == false); - Preconditions - .checkState(context.getVertexNumTasks(context.getVertexName()) == -1, - "Parallelism for the vertex should be set to -1 if the InputInitializer is setting parallelism"); RootInputConfigureVertexTasksEvent cEvent = (RootInputConfigureVertexTasksEvent) event; // The vertex cannot be configured until all DataEvents are seen - to @@ -250,11 +252,13 @@ private void processAllEvents(String inputName, // Construct the EdgeManager descriptor to be used by all edges which need // the routing table. - EdgeManagerDescriptor hiveEdgeManagerDesc = - new EdgeManagerDescriptor(CustomPartitionEdge.class.getName()); - byte[] payload = getBytePayload(bucketToTaskMap); - hiveEdgeManagerDesc.setUserPayload(payload); - + EdgeManagerDescriptor hiveEdgeManagerDesc = null; + if ((vertexType == VertexType.MULTI_INPUT_INITIALIZED_EDGES) + || (vertexType == VertexType.INITIALIZED_EDGES)) { + hiveEdgeManagerDesc = new EdgeManagerDescriptor(CustomPartitionEdge.class.getName()); + byte[] payload = getBytePayload(bucketToTaskMap); + hiveEdgeManagerDesc.setUserPayload(payload); + } Map emMap = Maps.newHashMap(); // Replace the edge manager for all vertices which have routing type custom. @@ -291,7 +295,6 @@ private void processAllEvents(String inputName, taskCount, new VertexLocationHint(grouper.createTaskLocationHints(finalSplits .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate); - // Set the actual events for the tasks. context.addRootInputEvents(inputName, taskEvents); } @@ -318,7 +321,8 @@ private FileSplit getFileSplitFromEvent(RootInputDataInformationEvent event) thr if (!(inputSplit instanceof FileSplit)) { throw new UnsupportedOperationException( - "Cannot handle splits other than FileSplit for the moment"); + "Cannot handle splits other than FileSplit for the moment. Current input split type: " + + inputSplit.getClass().getSimpleName()); } return (FileSplit) inputSplit; } @@ -349,4 +353,5 @@ private FileSplit getFileSplitFromEvent(RootInputDataInformationEvent event) thr return bucketToInitialSplitMap; } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java new file mode 100644 index 0000000..d923fa9 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java @@ -0,0 +1,50 @@ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hive.ql.plan.BaseWork.VertexType; +import org.apache.hadoop.io.Writable; + +public class CustomVertexConfiguration implements Writable { + + private int numBuckets; + private VertexType vertexType; + private String inputName; + + public CustomVertexConfiguration() { + } + + public CustomVertexConfiguration(int numBuckets, VertexType vertexType, String inputName) { + this.numBuckets = numBuckets; + this.vertexType = vertexType; + this.inputName = inputName; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(this.vertexType.ordinal()); + out.writeInt(this.numBuckets); + out.writeUTF(inputName); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.vertexType = VertexType.values()[in.readInt()]; + this.numBuckets = in.readInt(); + this.inputName = in.readUTF(); + } + + public int getNumBuckets() { + return numBuckets; + } + + public VertexType getVertexType() { + return vertexType; + } + + public String getInputName() { + return inputName; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index d65dc26..01fada7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -55,7 +55,9 @@ import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.BaseWork.VertexType; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; @@ -103,10 +105,13 @@ import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.input.MultiMRInput; import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.partition.MRPartitioner; import org.apache.tez.runtime.api.TezRootInputInitializer; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.comparator.TezBytesComparator; +import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization; import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer; import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfigurer; import org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfigurer; @@ -237,7 +242,11 @@ public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, int numBuckets = edgeProp.getNumBuckets(); VertexManagerPluginDescriptor desc = new VertexManagerPluginDescriptor(CustomPartitionVertex.class.getName()); - byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array(); + CustomVertexConfiguration vertexConf = + new CustomVertexConfiguration(numBuckets, VertexType.INITIALIZED_EDGES, null); + DataOutputBuffer dob = new DataOutputBuffer(); + vertexConf.write(dob); + byte[] userPayload = dob.getData(); desc.setUserPayload(userPayload); w.setVertexManagerPlugin(desc); break; @@ -276,9 +285,13 @@ public Edge createEdge(JobConf vConf, Vertex v, Vertex w, switch(edgeProp.getEdgeType()) { case CUSTOM_EDGE: { int numBuckets = edgeProp.getNumBuckets(); - byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array(); + CustomVertexConfiguration vertexConf = + new CustomVertexConfiguration(numBuckets, VertexType.INITIALIZED_EDGES, null); + DataOutputBuffer dob = new DataOutputBuffer(); + vertexConf.write(dob); VertexManagerPluginDescriptor desc = new VertexManagerPluginDescriptor( CustomPartitionVertex.class.getName()); + byte[] userPayload = dob.getData(); desc.setUserPayload(userPayload); w.setVertexManagerPlugin(desc); break; @@ -310,14 +323,21 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration switch (edgeType) { case BROADCAST_EDGE: UnorderedUnpartitionedKVEdgeConfigurer et1Conf = UnorderedUnpartitionedKVEdgeConfigurer - .newBuilder(keyClass, valClass).setFromConfiguration(conf).build(); + .newBuilder(keyClass, valClass) + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName()) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName()) + .build(); return et1Conf.createDefaultBroadcastEdgeProperty(); case CUSTOM_EDGE: assert partitionerClassName != null; partitionerConf = createPartitionerConf(partitionerClassName, conf); UnorderedPartitionedKVEdgeConfigurer et2Conf = UnorderedPartitionedKVEdgeConfigurer .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf).build(); + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName()) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName()) + .build(); EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor(CustomPartitionEdge.class.getName()); CustomEdgeConfiguration edgeConf = @@ -332,7 +352,10 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration partitionerConf = createPartitionerConf(partitionerClassName, conf); UnorderedPartitionedKVEdgeConfigurer et3Conf = UnorderedPartitionedKVEdgeConfigurer .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf).build(); + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName()) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName()) + .build(); return et3Conf.createDefaultEdgeProperty(); case SIMPLE_EDGE: default: @@ -340,14 +363,18 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration partitionerConf = createPartitionerConf(partitionerClassName, conf); OrderedPartitionedKVEdgeConfigurer et4Conf = OrderedPartitionedKVEdgeConfigurer .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf).build(); + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), + TezBytesComparator.class.getName()) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName()) + .build(); return et4Conf.createDefaultEdgeProperty(); } } /** * Utility method to create a stripped down configuration for the MR partitioner. - * + * * @param partitionerClassName * the real MR partitioner class name * @param baseConf @@ -408,12 +435,67 @@ private String getContainerJavaOpts(Configuration conf) { return MRHelpers.getMapJavaOpts(conf); } + private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, LocalResource appJarLr, + List additionalLr, FileSystem fs, Path mrScratchDir, Context ctx) + throws Exception { + if (mergeJoinWork.getMainWork() instanceof MapWork) { + List mapWorkList = mergeJoinWork.getBaseWorkList(); + Utilities.setMergeWork(conf, mergeJoinWork, mrScratchDir, false); + MapWork mapWork = (MapWork) (mergeJoinWork.getMainWork()); + Vertex mergeVx = createVertex(conf, mapWork, appJarLr, additionalLr, fs, mrScratchDir, ctx); + + // grouping happens in execution phase. Setting the class to + // TezGroupedSplitsInputFormat + // here would cause pre-mature grouping which would be incorrect. + Class inputFormatClass = HiveInputFormat.class; + conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class); + // mapreduce.tez.input.initializer.serialize.event.payload should be set + // to false when using this plug-in to avoid getting a serialized event at run-time. + conf.setBoolean("mapreduce.tez.input.initializer.serialize.event.payload", false); + for (int i = 0; i < mapWorkList.size(); i++) { + mapWork = (MapWork) (mapWorkList.get(i)); + String alias = mapWork.getAliasToWork().keySet().iterator().next(); + byte[] mrInput = null; + + byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf); + + mrInput = MRHelpers.createMRInputPayload(serializedConf); + LOG.info("Going through each work and adding MultiMRInput"); + mergeVx.addDataSource(mapWork.getName(), + new DataSourceDescriptor( + new InputDescriptor( + MultiMRInput.class.getName()).setUserPayload(mrInput), + new InputInitializerDescriptor( + MRInputAMSplitGenerator.class.getName()).setUserPayload(mrInput), null)); + } + + VertexManagerPluginDescriptor desc = + new VertexManagerPluginDescriptor(CustomPartitionVertex.class.getName()); + // FIXME need to set the right bucket and type values + VertexType vertexType = ((MapWork) (mapWorkList.get(0))).getVertexType(); + CustomVertexConfiguration vertexConf = + new CustomVertexConfiguration(4, vertexType, mergeJoinWork.getBigTableAlias()); + DataOutputBuffer dob = new DataOutputBuffer(); + vertexConf.write(dob); + byte[] userPayload = dob.getData(); + desc.setUserPayload(userPayload); + mergeVx.setVertexManagerPlugin(desc); + return mergeVx; + } else { + Vertex mergeVx = + createVertex(conf, (ReduceWork) mergeJoinWork.getMainWork(), appJarLr, additionalLr, fs, + mrScratchDir, ctx); + return mergeVx; + } + } + /* * Helper function to create Vertex from MapWork. */ private Vertex createVertex(JobConf conf, MapWork mapWork, LocalResource appJarLr, List additionalLr, FileSystem fs, - Path mrScratchDir, Context ctx, TezWork tezWork) throws Exception { + Path mrScratchDir, Context ctx) + throws Exception { Path tezDir = getTezDir(mrScratchDir); @@ -425,7 +507,6 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, // finally create the vertex Vertex map = null; - // use tez to combine splits boolean useTezGroupedSplits = false; @@ -435,15 +516,8 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, Class inputFormatClass = conf.getClass("mapred.input.format.class", InputFormat.class); - boolean vertexHasCustomInput = false; - if (tezWork != null) { - for (BaseWork baseWork : tezWork.getParents(mapWork)) { - if (tezWork.getEdgeType(baseWork, mapWork) == EdgeType.CUSTOM_EDGE) { - vertexHasCustomInput = true; - } - } - } - + boolean vertexHasCustomInput = !(mapWork.getDoSplitsGrouping()); + LOG.info("Vertex has custom input? " + vertexHasCustomInput); if (vertexHasCustomInput) { useTezGroupedSplits = false; // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat @@ -500,7 +574,8 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, } map.addDataSource(alias, new DataSourceDescriptor(new InputDescriptor(MRInputLegacy.class.getName()). - setUserPayload(mrInput), new InputInitializerDescriptor(amSplitGeneratorClass.getName()).setUserPayload(mrInput),null)); + setUserPayload(mrInput), new InputInitializerDescriptor( + amSplitGeneratorClass.getName()).setUserPayload(mrInput),null)); Map localResources = new HashMap(); localResources.put(getBaseName(appJarLr), appJarLr); @@ -888,12 +963,22 @@ public JobConf initializeVertexConf(JobConf conf, BaseWork work) { return initializeVertexConf(conf, (MapWork)work); } else if (work instanceof ReduceWork) { return initializeVertexConf(conf, (ReduceWork)work); + } else if (work instanceof MergeJoinWork) { + return initializeVertexConf(conf, (MergeJoinWork) work); } else { assert false; return null; } } + private JobConf initializeVertexConf(JobConf conf, MergeJoinWork work) { + if (work.getMainWork() instanceof MapWork) { + return initializeVertexConf(conf, (MapWork) (work.getMainWork())); + } else { + return initializeVertexConf(conf, (ReduceWork) (work.getMainWork())); + } + } + /** * Create a vertex from a given work object. * @@ -918,10 +1003,13 @@ public Vertex createVertex(JobConf conf, BaseWork work, // BaseWork. if (work instanceof MapWork) { v = createVertex(conf, (MapWork) work, appJarLr, - additionalLr, fileSystem, scratchDir, ctx, tezWork); + additionalLr, fileSystem, scratchDir, ctx); } else if (work instanceof ReduceWork) { v = createVertex(conf, (ReduceWork) work, appJarLr, additionalLr, fileSystem, scratchDir, ctx); + } else if (work instanceof MergeJoinWork) { + v = createVertex(conf, (MergeJoinWork)work, appJarLr, + additionalLr, fileSystem, scratchDir, ctx); } else { // something is seriously wrong if this is happening throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index d964eb1..db626b8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -25,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; @@ -36,14 +40,17 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; +import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger; import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.StringUtils; import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.input.MultiMRInput; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; @@ -57,12 +64,16 @@ public class MapRecordProcessor extends RecordProcessor { + private static KeyValueReader mergeQueue; private MapOperator mapOp; + private MapOperator mergeMapOp = null; public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class); private final ExecMapperContext execContext = new ExecMapperContext(); private boolean abort = false; protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; private MapWork mapWork; + private static Map connectOps = + new HashMap(); @Override void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter, @@ -90,11 +101,18 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr ObjectCache cache = ObjectCacheFactory.getCache(jconf); try { + MapWork mergeMapWork = null; execContext.setJc(jconf); // create map and fetch operators mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY); if (mapWork == null) { mapWork = Utilities.getMapWork(jconf); + MergeJoinWork mergeJoinWork = Utilities.getMergeWork(jconf, MergeJoinWork.class); + // FIXME currently hard-coded for single side table + if (mergeJoinWork != null) { + mergeMapWork = (MapWork) mergeJoinWork.getBaseWorkList().get(0); + } + cache.cache(MAP_PLAN_KEY, mapWork); l4j.info("Plan: "+mapWork); for (String s: mapWork.getAliases()) { @@ -109,16 +127,38 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr mapOp = new MapOperator(); } + if (mergeMapWork != null) { + if (mergeMapWork.getVectorMode()) { + mergeMapOp = new VectorMapOperator(); + } else { + mergeMapOp = new MapOperator(); + } + } + + // initialize the merge operators first. + if (mergeMapOp != null) { + mergeMapOp.setConf(mergeMapWork); + mergeMapOp.setChildrenSecondary(jconf); + DummyStoreOperator dummyOp = getJoinParentOp(mergeMapOp); + connectOps.put(dummyOp, mergeMapWork.getTag()); + mergeMapOp.setExecContext(execContext); + mergeMapOp.initializeLocalWork(jconf); + } + // initialize map operator mapOp.setConf(mapWork); mapOp.setChildren(jconf); - l4j.info(mapOp.dump(0)); + // l4j.info(mapOp.dump(0)); MapredContext.init(true, new JobConf(jconf)); ((TezContext)MapredContext.get()).setInputs(inputs); mapOp.setExecContext(execContext); mapOp.initializeLocalWork(jconf); + mapOp.initialize(jconf, null); + if (mergeMapOp != null) { + mergeMapOp.initialize(jconf, null); + } // Initialization isn't finished until all parents of all operators // are initialized. For broadcast joins that means initializing the @@ -148,8 +188,20 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); } + private DummyStoreOperator getJoinParentOp(Operator mergeMapOp) { + for (Operator childOp : mergeMapOp.getChildOperators()) { + l4j.info("Child op we are walking: " + childOp); + if ((childOp.getChildOperators() == null) || (childOp.getChildOperators().isEmpty())) { + return (DummyStoreOperator) childOp; + } else { + return getJoinParentOp(childOp); + } + } + return null; + } + @Override - void run() throws IOException{ + void run() throws IOException, InterruptedException { MRInputLegacy in = TezProcessor.getMRInput(inputs); KeyValueReader reader = in.getReader(); @@ -239,4 +291,28 @@ void close(){ MapredContext.close(); } } + + public static Object getNextRow(String inputName) throws Exception { + // get the next row from the priority queue corresponding to this input name + l4j.info("We are getting next row"); + if (mergeQueue == null) { + MultiMRInput multiMRInput = TezProcessor.getInput(inputName); + l4j.info("Multi MR input is: " + multiMRInput); + Collection kvReaders = multiMRInput.getKeyValueReaders(); + l4j.info("Multi mr inputs get key value readers returned: " + kvReaders.size()); + List kvReaderList = new ArrayList(kvReaders); + mergeQueue = new KeyValueInputMerger(kvReaderList); + } + + if (mergeQueue.next()) { + l4j.info("Returning the next row"); + return mergeQueue.getCurrentValue(); + } else { + return null; + } + } + + public static Map getConnectOps() { + return connectOps; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index e884afd..ba8923e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -38,7 +37,7 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; -import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger; +import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValuesInputMerger; import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -59,7 +58,6 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.tez.mapreduce.processor.MRTaskReporter; @@ -120,6 +118,10 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr ObjectCache cache = ObjectCacheFactory.getCache(jconf); + for (Entry entry : inputs.entrySet()) { + l4j.info("REDUCER name : " + entry.getKey() + " Logical input type: " + entry.getValue()); + } + rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector keyObjectInspector; @@ -140,7 +142,7 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr try { keyTableDesc = redWork.getKeyDesc(); - inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc + inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc .getDeserializerClass(), null); SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null); keyObjectInspector = inputKeyDeserializer.getObjectInspector(); @@ -152,7 +154,7 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr keyStructInspector = (StructObjectInspector)keyObjectInspector; batches = new VectorizedRowBatch[maxTags]; valueStructInspectors = new StructObjectInspector[maxTags]; - valueStringWriters = (List[])new List[maxTags]; + valueStringWriters = new List[maxTags]; keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size(); buffer = new DataOutputBuffer(); } @@ -279,7 +281,7 @@ void run() throws Exception { kvsReader = (KeyValuesReader) shuffleInputs.get(0).getReader(); }else { //get a sort merged input - kvsReader = new InputMerger(shuffleInputs); + kvsReader = new KeyValuesInputMerger(shuffleInputs); } } catch (Exception e) { throw new IOException(e); @@ -293,7 +295,6 @@ void run() throws Exception { break; } } - } /** @@ -306,7 +307,7 @@ void run() throws Exception { Map tag2input = redWork.getTagToInput(); ArrayList shuffleInputs = new ArrayList(); for(String inpStr : tag2input.values()){ - shuffleInputs.add((LogicalInput)inputs.get(inpStr)); + shuffleInputs.add(inputs.get(inpStr)); } return shuffleInputs; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index ea3770d..0b77d39 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; import java.text.NumberFormat; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,6 +33,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.input.MultiMRInput; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; @@ -60,6 +62,7 @@ private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); private TezProcessorContext processorContext; + private static Map multiMRInputMap; protected static final NumberFormat taskIdFormat = NumberFormat.getInstance(); protected static final NumberFormat jobIdFormat = NumberFormat.getInstance(); @@ -96,6 +99,7 @@ public void initialize() throws IOException { Configuration conf = TezUtils.createConfFromUserPayload(userPayload); this.jobConf = new JobConf(conf); setupMRLegacyConfigs(processorContext); + multiMRInputMap = new HashMap(); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR); } @@ -139,10 +143,12 @@ public void run(Map inputs, Map out if (isMap) { rproc = new MapRecordProcessor(); MRInputLegacy mrInput = getMRInput(inputs); - try { - mrInput.init(); - } catch (IOException e) { - throw new RuntimeException("Failed while initializing MRInput", e); + if (mrInput != null) { + try { + mrInput.init(); + } catch (IOException e) { + throw new RuntimeException("Failed while initializing MRInput", e); + } } } else { rproc = new ReduceRecordProcessor(); @@ -208,23 +214,40 @@ void initialize() throws Exception { this.writer = (KeyValueWriter) output.getWriter(); } + @Override public void collect(Object key, Object value) throws IOException { writer.write(key, value); } } - static MRInputLegacy getMRInput(Map inputs) { + static MRInputLegacy getMRInput(Map inputs) throws InterruptedException, + IOException { //there should be only one MRInput MRInputLegacy theMRInput = null; - for(LogicalInput inp : inputs.values()){ - if(inp instanceof MRInputLegacy){ + LOG.info("The input names are: " + Arrays.toString(inputs.keySet().toArray())); + for (Entry inp : inputs.entrySet()) { + if (inp.getValue() instanceof MRInputLegacy) { if(theMRInput != null){ throw new IllegalArgumentException("Only one MRInput is expected"); } //a better logic would be to find the alias - theMRInput = (MRInputLegacy)inp; + theMRInput = (MRInputLegacy) inp.getValue(); + } else if (inp.getValue() instanceof MultiMRInput) { + LOG.info("Found input type MultiMRInput: " + inp); + multiMRInputMap.put(inp.getKey(), (MultiMRInput) inp.getValue()); + // Iterator mrInputKeyValueIterator = + // multiMRInput.getKeyValueReaders().iterator(); + // while (mrInputKeyValueIterator.hasNext()) { + // KeyValueReader kvReader = mrInputKeyValueIterator.next(); + // } } } return theMRInput; } + + public static MultiMRInput getInput(String inputName) { + LOG.info("Getting input for name: " + multiMRInputMap); + LOG.info("Multi mr input is " + multiMRInputMap.get(inputName) + " input name: " + inputName); + return multiMRInputMap.get(inputName); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java new file mode 100644 index 0000000..08c454b --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez.tools; + +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor; +import org.apache.hadoop.io.BinaryComparable; +import org.apache.tez.runtime.library.api.KeyValueReader; + +/** + * A KeyValuesReader implementation that returns a sorted stream of key-values + * by doing a sorted merge of the key-value in LogicalInputs. + * Tags are in the last byte of the key, so no special handling for tags is required. + * Uses a priority queue to pick the KeyValuesReader of the input that is next in + * sort order. + */ +public class KeyValueInputMerger extends KeyValueReader { + + public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class); + private PriorityQueue pQueue = null; + private KeyValueReader nextKVReader = null; + + public KeyValueInputMerger(List multiMRInputs) throws Exception { + //get KeyValuesReaders from the LogicalInput and add them to priority queue + int initialCapacity = multiMRInputs.size(); + pQueue = new PriorityQueue(initialCapacity, new KVReaderComparator()); + l4j.info("Initialized the priority queue with multi mr inputs: " + multiMRInputs.size()); + for (KeyValueReader input : multiMRInputs) { + addToQueue(input); + } + } + + /** + * Add KeyValueReader to queue if it has more key-value + * + * @param kvReader + * @throws IOException + */ + private void addToQueue(KeyValueReader kvReader) throws IOException { + if (kvReader.next()) { + pQueue.add(kvReader); + } + } + + /** + * @return true if there are more key-values and advances to next key-values + * @throws IOException + */ + @Override + public boolean next() throws IOException { + //add the previous nextKVReader back to queue + if(nextKVReader != null){ + addToQueue(nextKVReader); + } + + //get the new nextKVReader with lowest key + nextKVReader = pQueue.poll(); + return nextKVReader != null; + } + + @Override + public Object getCurrentKey() throws IOException { + return nextKVReader.getCurrentKey(); + } + + @Override + public Object getCurrentValue() throws IOException { + return nextKVReader.getCurrentValue(); + } + + /** + * Comparator that compares KeyValuesReader on their current key + */ + class KVReaderComparator implements Comparator { + + @Override + public int compare(KeyValueReader kvReadr1, KeyValueReader kvReadr2) { + try { + BinaryComparable key1 = (BinaryComparable) kvReadr1.getCurrentValue(); + BinaryComparable key2 = (BinaryComparable) kvReadr2.getCurrentValue(); + return key1.compareTo(key2); + } catch (IOException e) { + l4j.error("Caught exception while reading shuffle input", e); + //die! + throw new RuntimeException(e); + } + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java new file mode 100644 index 0000000..c06907e --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez.tools; + +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor; +import org.apache.hadoop.io.BinaryComparable; +import org.apache.tez.runtime.api.Input; +import org.apache.tez.runtime.library.api.KeyValuesReader; + +/** + * A KeyValuesReader implementation that returns a sorted stream of key-values + * by doing a sorted merge of the key-value in LogicalInputs. + * Tags are in the last byte of the key, so no special handling for tags is required. + * Uses a priority queue to pick the KeyValuesReader of the input that is next in + * sort order. + */ +public class KeyValuesInputMerger extends KeyValuesReader { + + public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class); + private PriorityQueue pQueue = null; + private KeyValuesReader nextKVReader = null; + + public KeyValuesInputMerger(List shuffleInputs) throws Exception { + //get KeyValuesReaders from the LogicalInput and add them to priority queue + int initialCapacity = shuffleInputs.size(); + pQueue = new PriorityQueue(initialCapacity, new KVReaderComparator()); + for(Input input : shuffleInputs){ + addToQueue((KeyValuesReader)input.getReader()); + } + } + + /** + * Add KeyValuesReader to queue if it has more key-values + * @param kvsReadr + * @throws IOException + */ + private void addToQueue(KeyValuesReader kvsReadr) throws IOException{ + if(kvsReadr.next()){ + pQueue.add(kvsReadr); + } + } + + /** + * @return true if there are more key-values and advances to next key-values + * @throws IOException + */ + public boolean next() throws IOException { + //add the previous nextKVReader back to queue + if(nextKVReader != null){ + addToQueue(nextKVReader); + } + + //get the new nextKVReader with lowest key + nextKVReader = pQueue.poll(); + return nextKVReader != null; + } + + public Object getCurrentKey() throws IOException { + return nextKVReader.getCurrentKey(); + } + + public Iterable getCurrentValues() throws IOException { + return nextKVReader.getCurrentValues(); + } + + /** + * Comparator that compares KeyValuesReader on their current key + */ + class KVReaderComparator implements Comparator { + + @Override + public int compare(KeyValuesReader kvReadr1, KeyValuesReader kvReadr2) { + try { + BinaryComparable key1 = (BinaryComparable) kvReadr1.getCurrentKey(); + BinaryComparable key2 = (BinaryComparable) kvReadr2.getCurrentKey(); + return key1.compareTo(key2); + } catch (IOException e) { + l4j.error("Caught exception while reading shuffle input", e); + //die! + throw new RuntimeException(e); + } + } + } + + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java index d9139b8..ff405eb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java @@ -40,7 +40,7 @@ public TezMergedLogicalInput(TezMergedInputContext context, List inputs) @Override public Reader getReader() throws Exception { - return new InputMerger(getInputs()); + return new KeyValuesInputMerger(getInputs()); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index 6d9739f..9b718d6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hive.ql.plan; -import java.util.LinkedList; import java.util.LinkedHashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -36,11 +36,19 @@ @SuppressWarnings({"serial", "deprecation"}) public abstract class BaseWork extends AbstractOperatorDesc { + public enum VertexType { + AUTO_INITIALIZED_EDGES, // no custom vertex or edge + INITIALIZED_EDGES, // custom vertex and custom edge but single MR Input + MULTI_INPUT_INITIALIZED_EDGES, // custom vertex, custom edge and multi MR Input + MULTI_INPUT_UNINITIALIZED_EDGES // custom vertex, no custom edge, multi MR Input + } + // dummyOps is a reference to all the HashTableDummy operators in the // plan. These have to be separately initialized when we setup a task. // Their function is mainly as root ops to give the mapjoin the correct // schema info. List dummyOps; + int tag; public BaseWork() {} @@ -95,7 +103,7 @@ public void addDummyOp(HashTableDummyOperator dummyOp) { // add all children opStack.addAll(opSet); - + while(!opStack.empty()) { Operator op = opStack.pop(); returnSet.add(op); @@ -108,4 +116,12 @@ public void addDummyOp(HashTableDummyOperator dummyOp) { } public abstract void configureJobConf(JobConf job); + + public void setTag(int tag) { + this.tag = tag; + } + + public int getTag() { + return tag; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java new file mode 100644 index 0000000..1a30883 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.Map; + +import org.apache.hadoop.hive.ql.exec.Operator; + +@Explain(displayName = "Common Merge Join Operator") +public class CommonMergeJoinDesc extends MapJoinDesc implements Serializable { + private static final long serialVersionUID = 1L; + private int numBuckets; + private boolean isSubQuery; + private int mapJoinConversionPos; + private JoinDesc bigTableJoinDesc; + private Map> aliasToSinkMap; + private boolean genJoinKeys = true; + + CommonMergeJoinDesc() { + } + + public CommonMergeJoinDesc(int numBuckets, boolean isSubQuery, int mapJoinConversionPos, + MapJoinDesc joinDesc) { + super(joinDesc); + this.numBuckets = numBuckets; + this.isSubQuery = isSubQuery; + this.mapJoinConversionPos = mapJoinConversionPos; + } + + public boolean getCustomMerge() { + return isSubQuery; + } + + public int getNumBuckets() { + return numBuckets; + } + + public int getBigTablePosition() { + return mapJoinConversionPos; + } + + public Map getTagToAlias() { + // FIXME this is supposed to be populated in the planning phase. Has a + // parent index to input name mapping. + return null; + } + + public Map> getAliasToSinkMap() { + return aliasToSinkMap; + } + + public void setAliasToSinkMap(Map> aliasToSinkMap) { + this.aliasToSinkMap = aliasToSinkMap; + } + + public void setGenJoinKeys(boolean genJoinKeys) { + this.genJoinKeys = genJoinKeys; + } + + public boolean getGenJoinKeys() { + return genJoinKeys; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 1d96c5d..a2e40db 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -26,9 +26,9 @@ import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; -import java.util.Set; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -120,6 +120,10 @@ private Map> scratchColumnMap = null; private boolean vectorMode = false; + private boolean doSplitsGrouping = true; + + private VertexType vertexType = VertexType.AUTO_INITIALIZED_EDGES; + public MapWork() {} public MapWork(String name) { @@ -555,4 +559,45 @@ public void logPathToAliases() { } } } + + public void setDoSplitsGrouping(boolean doSplitsGrouping) { + this.doSplitsGrouping = doSplitsGrouping; + } + + public boolean getDoSplitsGrouping() { + return this.doSplitsGrouping; + } + + public void setVertexType(VertexType incomingVertexType) { + switch (this.vertexType) { + case INITIALIZED_EDGES: + if (incomingVertexType == VertexType.MULTI_INPUT_UNINITIALIZED_EDGES) { + vertexType = VertexType.MULTI_INPUT_INITIALIZED_EDGES; + } + break; + + case MULTI_INPUT_INITIALIZED_EDGES: + // nothing to do + break; + + case MULTI_INPUT_UNINITIALIZED_EDGES: + if (incomingVertexType == VertexType.INITIALIZED_EDGES) { + vertexType = VertexType.MULTI_INPUT_INITIALIZED_EDGES; + } + break; + + case AUTO_INITIALIZED_EDGES: + vertexType = incomingVertexType; + break; + + default: + break; + } + this.vertexType = vertexType; + } + + public VertexType getVertexType() { + // TODO Auto-generated method stub + return this.vertexType; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java new file mode 100644 index 0000000..52cf1c5 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java @@ -0,0 +1,88 @@ +package org.apache.hadoop.hive.ql.plan; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.mapred.JobConf; + +public class MergeJoinWork extends BaseWork { + + private final List mergeJoinOpList = + new ArrayList(); + private final List mergeWorkList = new ArrayList(); + private int bigTableInputId; + private boolean isBigTableWork; + private BaseWork bigTableWork; + + public MergeJoinWork() { + super(); + } + + public MergeJoinWork(BaseWork work) { + super(work.getName()); + } + + @Explain(displayName = "Vertex") + @Override + public String getName() { + return super.getName(); + } + + @Override + public void replaceRoots(Map, Operator> replacementMap) { + } + + @Override + public Set> getAllRootOperators() { + return new HashSet>(); + } + + @Override + public void configureJobConf(JobConf job) { + } + + public List getMergeJoinOperator() { + return this.mergeJoinOpList; + } + + public void addMergeJoinOperator(CommonMergeJoinOperator mergeJoinOp) { + this.mergeJoinOpList.add(mergeJoinOp); + this.bigTableInputId = mergeJoinOp.getConf().getBigTablePosition(); + } + + public void addMergedWork(BaseWork work, BaseWork connectWork) { + if (work != null) { + if ((bigTableWork != null) && (bigTableWork != work)) { + assert false; + } + this.bigTableWork = work; + setName(work.getName()); + if (work instanceof MapWork) { + MapWork mapWork = (MapWork) work; + mapWork.setVertexType(VertexType.MULTI_INPUT_UNINITIALIZED_EDGES); + mapWork.setDoSplitsGrouping(false); + } + } + + if (connectWork != null) { + this.mergeWorkList.add(connectWork); + } + } + + public List getBaseWorkList() { + return mergeWorkList; + } + + public String getBigTableAlias() { + return ((MapWork) bigTableWork).getAliasToWork().keySet().iterator().next(); + } + + public BaseWork getMainWork() { + return bigTableWork; + } +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardStructObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardStructObjectInspector.java index c96fc2d..4dff98a 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardStructObjectInspector.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardStructObjectInspector.java @@ -51,7 +51,7 @@ protected MyField() { super(); } - + public MyField(int fieldID, String fieldName, ObjectInspector fieldObjectInspector) { this.fieldID = fieldID; @@ -65,18 +65,22 @@ public MyField(int fieldID, String fieldName, this.fieldComment = fieldComment; } + @Override public int getFieldID() { return fieldID; } + @Override public String getFieldName() { return fieldName; } + @Override public ObjectInspector getFieldObjectInspector() { return fieldObjectInspector; } + @Override public String getFieldComment() { return fieldComment; } @@ -115,6 +119,9 @@ protected void init(List structFieldNames, fields = new ArrayList(structFieldNames.size()); for (int i = 0; i < structFieldNames.size(); i++) { + LOG.info("i = " + i + " structFieldNames: " + structFieldNames + + " structFieldObjectInspectors: " + structFieldObjectInspectors + + " structFieldComments: " + structFieldComments); fields.add(new MyField(i, structFieldNames.get(i), structFieldObjectInspectors.get(i), structFieldComments == null ? null : structFieldComments.get(i))); @@ -133,10 +140,12 @@ protected void init(List fields) { } } + @Override public String getTypeName() { return ObjectInspectorUtils.getStandardStructTypeName(this); } + @Override public final Category getCategory() { return Category.STRUCT; }