diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7f4afd9..7348417 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -367,6 +367,11 @@ "jdbc:derby:;databaseName=metastore_db;create=true", "JDBC connect string for a JDBC metastore"), + METASTOREFORCERELOADCONF("hive.metastore.force.reload.conf", false, + "Whether to force reloading of the metastore configuration (including\n" + + "the connection URL, before the next metastore query that accesses the\n" + + "datastore. Once reloaded, this value is reset to false. Used for\n" + + "testing only."), HMSHANDLERATTEMPTS("hive.hmshandler.retry.attempts", 1, "The number of times to retry a HMSHandler call if there were a connection error"), HMSHANDLERINTERVAL("hive.hmshandler.retry.interval", 1000, @@ -1649,7 +1654,7 @@ HIVE_JAR_DIRECTORY("hive.jar.directory", null, "This is the location hive in tez mode will look for to find a site wide \n" + "installed hive instance."), - HIVE_USER_INSTALL_DIR("hive.user.install.directory", "hdfs:///user/", + HIVE_USER_INSTALL_DIR("hive.user.install.directory", "/user/", "If hive (in tez mode only) cannot find a usable hive jar in \"hive.jar.directory\", \n" + "it will upload the hive jar to \"hive.user.install.directory/user.name\"\n" + "and use it to run queries."), @@ -1729,7 +1734,13 @@ "When auto reducer parallelism is enabled this factor will be used to over-partition data in shuffle edges."), TEZ_MIN_PARTITION_FACTOR("hive.tez.min.partition.factor", 0.25f, "When auto reducer parallelism is enabled this factor will be used to put a lower limit to the number\n" + - "of reducers that tez specifies.") + "of reducers that tez specifies."), + TEZ_MERGE_FILE_PREFIX( + "hive.tez.merge.file.prefix", "", + "When using the merge join operation in the map phase, we will set this" + + "to work name of the merge work"), + TEZ_MERGE_WORK_FILE_PREFIXES("hive.tez.merge.file.prefixes", "", + "A comma separated list of work names used as prefix.") ; public final String varname; diff --git ql/if/queryplan.thrift ql/if/queryplan.thrift index eafbe5a..dbf35e6 100644 --- ql/if/queryplan.thrift +++ ql/if/queryplan.thrift @@ -56,6 +56,7 @@ enum OperatorType { PTF, MUX, DEMUX, + MERGEJOIN, } struct Operator { diff --git ql/src/gen/thrift/gen-cpp/queryplan_types.cpp ql/src/gen/thrift/gen-cpp/queryplan_types.cpp index 96dbb29..34e9007 100644 --- ql/src/gen/thrift/gen-cpp/queryplan_types.cpp +++ ql/src/gen/thrift/gen-cpp/queryplan_types.cpp @@ -51,7 +51,8 @@ int _kOperatorTypeValues[] = { OperatorType::HASHTABLEDUMMY, OperatorType::PTF, OperatorType::MUX, - OperatorType::DEMUX + OperatorType::DEMUX, + OperatorType::MERGEJOIN }; const char* _kOperatorTypeNames[] = { "JOIN", @@ -74,9 +75,10 @@ const char* _kOperatorTypeNames[] = { "HASHTABLEDUMMY", "PTF", "MUX", - "DEMUX" + "DEMUX", + "MERGEJOIN" }; -const std::map _OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(21, _kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +const std::map _OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(22, _kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); int _kTaskTypeValues[] = { TaskType::MAP, diff --git ql/src/gen/thrift/gen-cpp/queryplan_types.h ql/src/gen/thrift/gen-cpp/queryplan_types.h index 634dd55..71b5c88 100644 --- ql/src/gen/thrift/gen-cpp/queryplan_types.h +++ ql/src/gen/thrift/gen-cpp/queryplan_types.h @@ -56,7 +56,8 @@ struct OperatorType { HASHTABLEDUMMY = 17, PTF = 18, MUX = 19, - DEMUX = 20 + DEMUX = 20, + MERGEJOIN = 21 }; }; diff --git ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java index aa094ee..b286e06 100644 --- ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java +++ ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java @@ -32,7 +32,8 @@ HASHTABLEDUMMY(17), PTF(18), MUX(19), - DEMUX(20); + DEMUX(20), + MERGEJOIN(21); private final int value; @@ -95,6 +96,8 @@ public static OperatorType findByValue(int value) { return MUX; case 20: return DEMUX; + case 21: + return MERGEJOIN; default: return null; } diff --git ql/src/gen/thrift/gen-php/Types.php ql/src/gen/thrift/gen-php/Types.php index 5164b2c..f97ebf8 100644 --- ql/src/gen/thrift/gen-php/Types.php +++ ql/src/gen/thrift/gen-php/Types.php @@ -56,6 +56,7 @@ final class OperatorType { const PTF = 18; const MUX = 19; const DEMUX = 20; + const MERGEJOIN = 21; static public $__names = array( 0 => 'JOIN', 1 => 'MAPJOIN', @@ -78,6 +79,7 @@ final class OperatorType { 18 => 'PTF', 19 => 'MUX', 20 => 'DEMUX', + 21 => 'MERGEJOIN', ); } diff --git ql/src/gen/thrift/gen-py/queryplan/ttypes.py ql/src/gen/thrift/gen-py/queryplan/ttypes.py index 2a3f745..6c8970c 100644 --- ql/src/gen/thrift/gen-py/queryplan/ttypes.py +++ ql/src/gen/thrift/gen-py/queryplan/ttypes.py @@ -66,6 +66,7 @@ class OperatorType: PTF = 18 MUX = 19 DEMUX = 20 + MERGEJOIN = 21 _VALUES_TO_NAMES = { 0: "JOIN", @@ -89,6 +90,7 @@ class OperatorType: 18: "PTF", 19: "MUX", 20: "DEMUX", + 21: "MERGEJOIN", } _NAMES_TO_VALUES = { @@ -113,6 +115,7 @@ class OperatorType: "PTF": 18, "MUX": 19, "DEMUX": 20, + "MERGEJOIN": 21, } class TaskType: diff --git ql/src/gen/thrift/gen-rb/queryplan_types.rb ql/src/gen/thrift/gen-rb/queryplan_types.rb index 35e1f0f..82ae7a5 100644 --- ql/src/gen/thrift/gen-rb/queryplan_types.rb +++ ql/src/gen/thrift/gen-rb/queryplan_types.rb @@ -42,8 +42,9 @@ module OperatorType PTF = 18 MUX = 19 DEMUX = 20 - VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX"} - VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX, DEMUX]).freeze + MERGEJOIN = 21 + VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX", 21 => "MERGEJOIN"} + VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX, DEMUX, MERGEJOIN]).freeze end module TaskType diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java index 8c1067e..551e83c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java @@ -61,29 +61,30 @@ public AbstractMapJoinOperator(AbstractMapJoinOperator mj @Override @SuppressWarnings("unchecked") protected void initializeOp(Configuration hconf) throws HiveException { - int tagLen = conf.getTagLength(); - - joinKeys = new List[tagLen]; - - JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), NOTSKIPBIGTABLE); - joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinKeys, - inputObjInspectors,NOTSKIPBIGTABLE, tagLen); + boolean mapSideJoin = conf.isMapSideJoin(); + if (mapSideJoin) { + int tagLen = conf.getTagLength(); + joinKeys = new List[tagLen]; + JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), NOTSKIPBIGTABLE); + joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinKeys, + inputObjInspectors,NOTSKIPBIGTABLE, tagLen); + } super.initializeOp(hconf); + emptyList = new RowContainer>(1, hconf, reporter); - numMapRowsRead = 0; - firstRow = true; - - // all other tables are small, and are cached in the hash table - posBigTable = (byte) conf.getPosBigTable(); + if (mapSideJoin) { + numMapRowsRead = 0; + firstRow = true; - emptyList = new RowContainer>(1, hconf, reporter); + // all other tables are small, and are cached in the hash table + posBigTable = (byte) conf.getPosBigTable(); - RowContainer> bigPosRC = JoinUtil.getRowContainer(hconf, - rowContainerStandardObjectInspectors[posBigTable], - posBigTable, joinCacheSize,spillTableDesc, conf, - !hasFilter(posBigTable), reporter); - storage[posBigTable] = bigPosRC; + RowContainer> bigPosRC = + JoinUtil.getRowContainer(hconf, rowContainerStandardObjectInspectors[posBigTable], + posBigTable, joinCacheSize, spillTableDesc, conf, !hasFilter(posBigTable), reporter); + storage[posBigTable] = bigPosRC; + } initializeChildren(hconf); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java index 3110b0a..26e6629 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java @@ -217,7 +217,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { joinFilters = new List[tagLen]; JoinUtil.populateJoinKeyValue(joinFilters, conf.getFilters(),order,NOTSKIPBIGTABLE); - joinValuesObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinValues, inputObjInspectors,NOTSKIPBIGTABLE, tagLen); joinFilterObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinFilters, @@ -225,9 +224,12 @@ protected void initializeOp(Configuration hconf) throws HiveException { joinValuesStandardObjectInspectors = JoinUtil.getStandardObjectInspectors( joinValuesObjectInspectors,NOTSKIPBIGTABLE, tagLen); + LOG.info("Our join values standard object inspectors are : " + + joinValuesStandardObjectInspectors.length); filterMaps = conf.getFilterMap(); if (noOuterJoin) { + LOG.info("NO outer join."); rowContainerStandardObjectInspectors = joinValuesStandardObjectInspectors; } else { List[] rowContainerObjectInspectors = new List[tagLen]; @@ -638,6 +640,7 @@ public void endGroup() throws HiveException { } protected void internalForward(Object row, ObjectInspector outputOI) throws HiveException { + LOG.info("Forwarding row: " + row); forward(row, outputOI); } @@ -707,6 +710,7 @@ protected void checkAndGenObject() throws HiveException { if (allOne) { genAllOneUniqueJoinObject(); } else { + LOG.info("calling genUniqueJoinObject"); genUniqueJoinObject(0, 0); } } else { @@ -719,7 +723,7 @@ protected void checkAndGenObject() throws HiveException { if (noOuterJoin) { if (alw.rowCount() == 0) { - LOG.trace("No data for alias=" + i); + LOG.info("No data for alias=" + i); return; } else if (alw.rowCount() > 1) { mayHasMoreThanOne = true; @@ -749,11 +753,17 @@ protected void checkAndGenObject() throws HiveException { } if (!hasEmpty && !mayHasMoreThanOne) { + LOG.info("calling genAllOneUniqueJoinObject"); genAllOneUniqueJoinObject(); + LOG.info("called genAllOneUniqueJoinObject"); } else if (!hasEmpty && !hasLeftSemiJoin) { + LOG.info("calling genUniqueJoinObject"); genUniqueJoinObject(0, 0); + LOG.info("called genUniqueJoinObject"); } else { + LOG.info("calling genObject"); genJoinObject(); + LOG.info("called genObject"); } } Arrays.fill(aliasFilterTags, (byte)0xff); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java new file mode 100644 index 0000000..26faba4 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java @@ -0,0 +1,538 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; +import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; + +/* + * With an aim to consolidate the join algorithms to either hash based joins (MapJoinOperator) or + * sort-merge based joins, this operator is being introduced. This operator executes a sort-merge + * based algorithm. It replaces both the JoinOperator and the SMBMapJoinOperator for the tez side of + * things. It works in either the map phase or reduce phase. + * + * The basic algorithm is as follows: + * + * 1. The processOp receives a row from a "big" table. + * 2. In order to process it, the operator does a fetch for rows from the other tables. + * 3. Once we have a set of rows from the other tables (till we hit a new key), more rows are + * brought in from the big table and a join is performed. + */ + +public class CommonMergeJoinOperator extends AbstractMapJoinOperator implements + Serializable { + + private static final long serialVersionUID = 1L; + private boolean isBigTableWork; + private static final Log LOG = LogFactory.getLog(CommonMergeJoinOperator.class.getName()); + private Map aliasToInputNameMap; + transient List[] keyWritables; + transient List[] nextKeyWritables; + transient RowContainer>[] nextGroupStorage; + transient RowContainer>[] candidateStorage; + + transient String[] tagToAlias; + private transient boolean[] fetchDone; + private transient boolean[] foundNextKeyGroup; + transient boolean firstFetchHappened = false; + transient boolean localWorkInited = false; + transient boolean initDone = false; + transient List otherKey = null; + transient List values = null; + + public CommonMergeJoinOperator() { + super(); + } + + @SuppressWarnings("unchecked") + @Override + public void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); + initializeChildren(hconf); + int maxAlias = 0; + for (byte pos = 0; pos < order.length; pos++) { + if (pos > maxAlias) { + maxAlias = pos; + } + } + maxAlias += 1; + + nextGroupStorage = new RowContainer[maxAlias]; + candidateStorage = new RowContainer[maxAlias]; + keyWritables = new ArrayList[maxAlias]; + nextKeyWritables = new ArrayList[maxAlias]; + fetchDone = new boolean[maxAlias]; + foundNextKeyGroup = new boolean[maxAlias]; + + int bucketSize; + + int oldVar = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINBUCKETCACHESIZE); + if (oldVar != 100) { + bucketSize = oldVar; + } else { + bucketSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVESMBJOINCACHEROWS); + } + + for (byte pos = 0; pos < order.length; pos++) { + RowContainer> rc = + JoinUtil.getRowContainer(hconf, rowContainerStandardObjectInspectors[pos], pos, + bucketSize, spillTableDesc, conf, !hasFilter(pos), reporter); + nextGroupStorage[pos] = rc; + RowContainer> candidateRC = + JoinUtil.getRowContainer(hconf, rowContainerStandardObjectInspectors[pos], pos, + bucketSize, spillTableDesc, conf, !hasFilter(pos), reporter); + candidateStorage[pos] = candidateRC; + } + + for (byte pos = 0; pos < order.length; pos++) { + if (pos != posBigTable) { + fetchDone[pos] = false; + } + foundNextKeyGroup[pos] = false; + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hive.ql.exec.Operator#processOp(java.lang.Object, + * int) this processor has a push-pull model. First call to this method is a + * push but the rest is pulled until we run out of records. + */ + @Override + public void processOp(Object row, int tag) throws HiveException { + posBigTable = (byte) conf.getBigTablePosition(); + + byte alias = (byte) tag; + LOG.info("Tag in process op is " + tag); + List value = getFilteredValue(alias, row); + // compute keys and values as StandardObjects + List key = mergeJoinComputeKeys(row, alias); + + if (!firstFetchHappened) { + firstFetchHappened = true; + // fetch the first group for all small table aliases + for (byte pos = 0; pos < order.length; pos++) { + if (pos != posBigTable) { + fetchNextGroup(pos); + } + } + } + + //have we reached a new key group? + boolean nextKeyGroup = processKey(alias, key); + LOG.info("Next key group? " + nextKeyGroup); + if (nextKeyGroup) { + //assert this.nextGroupStorage[alias].size() == 0; + this.nextGroupStorage[alias].addRow(value); + foundNextKeyGroup[tag] = true; + if (tag != posBigTable) { + LOG.info("tag " + tag + " != posBigTable " + posBigTable); + return; + } + } + + reportProgress(); + numMapRowsRead++; + + // the big table has reached a new key group. try to let the small tables + // catch up with the big table. + if (nextKeyGroup) { + assert tag == posBigTable; + List smallestPos = null; + LOG.info("Doing the join."); + do { + smallestPos = joinOneGroup(); + //jump out the loop if we need input from the big table + } while (smallestPos != null && smallestPos.size() > 0 + && !smallestPos.contains(this.posBigTable)); + + return; + } + + assert !nextKeyGroup; + candidateStorage[tag].addRow(value); + + } + + private List joinOneGroup() throws HiveException { + int[] smallestPos = findSmallestKey(); + List listOfNeedFetchNext = null; + if (smallestPos != null) { + listOfNeedFetchNext = joinObject(smallestPos); + if (listOfNeedFetchNext.size() > 0) { + // listOfNeedFetchNext contains all tables that we have joined data in their + // candidateStorage, and we need to clear candidate storage and promote their + // nextGroupStorage to candidateStorage and fetch data until we reach a + // new group. + for (Byte b : listOfNeedFetchNext) { + try { + fetchNextGroup(b); + } catch (Exception e) { + throw new HiveException(e); + } + } + } + } + return listOfNeedFetchNext; + } + + private List joinObject(int[] smallestPos) throws HiveException { + List needFetchList = new ArrayList(); + byte index = (byte) (smallestPos.length - 1); + for (; index >= 0; index--) { + if (smallestPos[index] > 0 || keyWritables[index] == null) { + putDummyOrEmpty(index); + continue; + } + storage[index] = candidateStorage[index]; + needFetchList.add(index); + if (smallestPos[index] < 0) { + break; + } + } + for (index--; index >= 0; index--) { + putDummyOrEmpty(index); + } + LOG.info("Check and gen the object"); + checkAndGenObject(); + for (Byte pos : needFetchList) { + this.candidateStorage[pos].clearRows(); + LOG.info("Key writables being nulled for pos: " + pos); + this.keyWritables[pos] = null; + } + return needFetchList; + } + + private void putDummyOrEmpty(Byte i) { + // put a empty list or null + if (noOuterJoin) { + storage[i] = emptyList; + } else { + storage[i] = dummyObjVectors[i]; + } + } + + private int[] findSmallestKey() { + int[] result = new int[order.length]; + List smallestOne = null; + + for (byte pos = 0; pos < order.length; pos++) { + List key = keyWritables[pos]; + if (key == null) { + continue; + } + if (smallestOne == null) { + smallestOne = key; + result[pos] = -1; + continue; + } + result[pos] = compareKeys(key, smallestOne); + if (result[pos] < 0) { + smallestOne = key; + } + } + return smallestOne == null ? null : result; + } + + private void fetchNextGroup(Byte t) throws HiveException { + LOG.info("Fetching next group for alias: " + t); + if (foundNextKeyGroup[t]) { + // first promote the next group to be the current group if we reached a + // new group in the previous fetch + if ((this.nextKeyWritables[t] != null) || (this.fetchDone[t] == false)) { + promoteNextGroupToCandidate(t); + } else { + LOG.info("Key writables being nulled " + t); + this.keyWritables[t] = null; + this.candidateStorage[t] = null; + this.nextGroupStorage[t] = null; + } + foundNextKeyGroup[t] = false; + } + // for the big table, we only need to promote the next group to the current group. + if (t == posBigTable) { + return; + } + + // for tables other than the big table, we need to fetch more data until reach a new group or + // done. + while (!foundNextKeyGroup[t]) { + if (fetchDone[t]) { + break; + } + fetchOneRow(t); + } + if (!foundNextKeyGroup[t] && fetchDone[t]) { + this.nextKeyWritables[t] = null; + } + } + + @Override + public void closeOp(boolean abort) throws HiveException { + joinFinalLeftData(); + + // clean up + for (int pos = 0; pos < order.length; pos++) { + if (pos != posBigTable) { + fetchDone[pos] = false; + } + foundNextKeyGroup[pos] = false; + } + } + + private void fetchOneRow(byte tag) throws HiveException { + try { + if (getParentOperators() == null || (getParentOperators().isEmpty())) { + @SuppressWarnings("unchecked") + List keyValues = + (List) ReduceRecordProcessor.getNextRow(aliasToInputNameMap.get(Integer + .valueOf(tag))); + if (keyValues == null) { + fetchDone[tag] = true; + } + int length = keyValues.size(); + for (int i = 1; i < length; i++) { + List objList = new ArrayList(); + objList.add(keyValues.get(0)); // get the key + objList.add(keyValues.get(i)); + this.processOp(objList, tag); + } + // instead of maintaining complex state for the fetch of the next group, we know for sure + // that at the end of all the values for a given key, we will definitely reach the next key + // group. + foundNextKeyGroup[tag] = true; + } else { + Object rowOfSide = null; + LOG.info("Dumping alias to input name map: " + aliasToInputNameMap.toString() + " tag: " + + tag + " input name: " + aliasToInputNameMap.get(Integer.valueOf(tag)) + " int tag: " + + Integer.valueOf(tag)); + rowOfSide = + getParentOperators().get(tag).getNextRow(aliasToInputNameMap.get(Integer.valueOf(tag))); + InspectableObject inspectableObj = (InspectableObject) rowOfSide; + if (inspectableObj == null) { + fetchDone[tag] = true; + return; + } + this.processOp(inspectableObj.o, tag); + } + } catch (Exception e) { + throw new HiveException(e); + } + } + + private void joinFinalLeftData() throws HiveException { + LOG.info("Join final left data"); + @SuppressWarnings("rawtypes") + RowContainer bigTblRowContainer = this.candidateStorage[this.posBigTable]; + + boolean allFetchDone = allFetchDone(); + // if all left data in small tables are less than and equal to the left data + // in big table, let's them catch up + while (bigTblRowContainer != null && bigTblRowContainer.rowCount() > 0 && !allFetchDone) { + joinOneGroup(); + bigTblRowContainer = this.candidateStorage[this.posBigTable]; + allFetchDone = allFetchDone(); + } + + while (!allFetchDone) { + List ret = joinOneGroup(); + if (ret == null || ret.size() == 0) { + break; + } + reportProgress(); + numMapRowsRead++; + allFetchDone = allFetchDone(); + } + + boolean dataInCache = true; + while (dataInCache) { + for (byte pos = 0; pos < order.length; pos++) { + if (this.foundNextKeyGroup[pos] && this.nextKeyWritables[pos] != null) { + promoteNextGroupToCandidate(pos); + } + } + joinOneGroup(); + dataInCache = false; + for (byte pos = 0; pos < order.length; pos++) { + if (this.candidateStorage[pos].rowCount() > 0) { + dataInCache = true; + break; + } + } + } + } + + private boolean allFetchDone() { + boolean allFetchDone = true; + for (byte pos = 0; pos < order.length; pos++) { + if (pos == posBigTable) { + continue; + } + allFetchDone = allFetchDone && fetchDone[pos]; + } + return allFetchDone; + } + + private void promoteNextGroupToCandidate(Byte t) throws HiveException { + LOG.info("Setting the key writables for the next key for alias " + t); + this.keyWritables[t] = this.nextKeyWritables[t]; + this.nextKeyWritables[t] = null; + RowContainer> oldRowContainer = this.candidateStorage[t]; + oldRowContainer.clearRows(); + this.candidateStorage[t] = this.nextGroupStorage[t]; + this.nextGroupStorage[t] = oldRowContainer; + } + + private boolean processKey(byte alias, List key) throws HiveException { + List keyWritable = keyWritables[alias]; + LOG.info("Key writable: " + keyWritable); + if (keyWritable == null) { + // the first group. + LOG.info("Key writables being set to " + key + " for alias: " + alias); + keyWritables[alias] = key; + return false; + } else { + int cmp = compareKeys(key, keyWritable); + LOG.info("Comparing keys: " + key + " key writable: " + keyWritable + " cmp: " + cmp); + if (cmp != 0) { + nextKeyWritables[alias] = key; + return true; + } + return false; + } + } + + @SuppressWarnings("rawtypes") + private int compareKeys(List k1, List k2) { + int ret = 0; + + // join keys have difference sizes? + ret = k1.size() - k2.size(); + if (ret != 0) { + return ret; + } + + for (int i = 0; i < k1.size(); i++) { + WritableComparable key_1 = (WritableComparable) k1.get(i); + WritableComparable key_2 = (WritableComparable) k2.get(i); + if (key_1 == null && key_2 == null) { + return nullsafes != null && nullsafes[i] ? 0 : -1; // just return k1 is + // smaller than k2 + } else if (key_1 == null) { + return -1; + } else if (key_2 == null) { + return 1; + } + ret = WritableComparator.get(key_1.getClass()).compare(key_1, key_2); + if (ret != 0) { + return ret; + } + } + return ret; + } + + @SuppressWarnings("unchecked") + private List mergeJoinComputeKeys(Object row, Byte alias) throws HiveException { + if ((joinKeysObjectInspectors != null) && (joinKeysObjectInspectors[alias] != null)) { + return JoinUtil.computeKeys(row, joinKeys[alias], joinKeysObjectInspectors[alias]); + } else { + row = + ObjectInspectorUtils.copyToStandardObject(row, inputObjInspectors[alias], + ObjectInspectorCopyOption.WRITABLE); + StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[alias]; + StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY.toString()); + return (List) soi.getStructFieldData(row, sf); + } + } + + @Override + public String getName() { + return getOperatorName(); + } + + static public String getOperatorName() { + return "MERGEJOIN"; + } + + @Override + public OperatorType getType() { + return OperatorType.MERGEJOIN; + } + + @Override + public void initializeLocalWork(Configuration hconf) throws HiveException { + Operator parent = null; + + for (Operator parentOp : parentOperators) { + if (parentOp != null) { + parent = parentOp; + break; + } + } + + if (parent == null) { + throw new HiveException("No valid parents."); + } + Map dummyOps = parent.getConnectOps(); + for (Entry connectOp : dummyOps.entrySet()) { + parentOperators.add(connectOp.getValue(), connectOp.getKey()); + connectOp.getKey().getChildOperators().add(this); + } + super.initializeLocalWork(hconf); + return; + } + + public boolean isBigTableWork() { + return isBigTableWork; + } + + public void setIsBigTableWork(boolean bigTableWork) { + this.isBigTableWork = bigTableWork; + } + + public Map getAliasToInputNameMap() { + return aliasToInputNameMap; + } + + public void setAliasToInputNameMap(Map aliasToInputNameMap) { + this.aliasToInputNameMap = aliasToInputNameMap; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java index b8f5227..0ac8d62 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java @@ -74,11 +74,11 @@ public DummyStoreOperator() { @Override protected void initializeOp(Configuration hconf) throws HiveException { /* - * The conversion to standard object inspector was necessitated by HIVE-5973. The issue - * happens when a select operator preceeds this operator as in the case of a subquery. The - * select operator does not allocate a new object to hold the deserialized row. This affects + * The conversion to standard object inspector was necessitated by HIVE-5973. The issue + * happens when a select operator preceeds this operator as in the case of a subquery. The + * select operator does not allocate a new object to hold the deserialized row. This affects * the operation of the SMB join which puts the object in a priority queue. Since all elements - * of the priority queue point to the same object, the join was resulting in incorrect + * of the priority queue point to the same object, the join was resulting in incorrect * results. * * So the fix is to make a copy of the object as done in the processOp phase below. This @@ -110,4 +110,19 @@ public InspectableObject getResult() { public OperatorType getType() { return OperatorType.FORWARD; } + + @Override + public Object getNextRow(String string) throws Exception { + // all operators except the MapOperator just fetch and process the row. + if (getParentOperators().size() == 0) { + // this is a reduce side operation. Need to fetch from the ReduceRecordProcessor + return null; + } + + // this call ensures that we compute a result. + if (getParentOperators().get(0).getNextRow(string) == null) { + return null; + } + return result; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index b1f8358..89d2df9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -33,8 +33,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor; +import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; @@ -181,7 +182,7 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, PartitionDesc pd = ctx.partDesc; TableDesc td = pd.getTableDesc(); - + MapOpCtx opCtx = new MapOpCtx(); // Use table properties in case of unpartitioned tables, // and the union of table properties and partition properties, with partition @@ -205,42 +206,42 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, opCtx.partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter( partRawRowObjectInspector, opCtx.tblRawRowObjectInspector); - + // Next check if this table has partitions and if so // get the list of partition names as well as allocate // the serdes for the partition columns String pcols = overlayedProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); - + if (pcols != null && pcols.length() > 0) { String[] partKeys = pcols.trim().split("/"); String pcolTypes = overlayedProps .getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES); String[] partKeyTypes = pcolTypes.trim().split(":"); - + if (partKeys.length > partKeyTypes.length) { throw new HiveException("Internal error : partKeys length, " +partKeys.length + " greater than partKeyTypes length, " + partKeyTypes.length); } - + List partNames = new ArrayList(partKeys.length); Object[] partValues = new Object[partKeys.length]; List partObjectInspectors = new ArrayList(partKeys.length); - + for (int i = 0; i < partKeys.length; i++) { String key = partKeys[i]; partNames.add(key); ObjectInspector oi = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector (TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i])); - + // Partitions do not exist for this table if (partSpec == null) { // for partitionless table, initialize partValue to null partValues[i] = null; } else { - partValues[i] = + partValues[i] = ObjectInspectorConverters. getConverter(PrimitiveObjectInspectorFactory. - javaStringObjectInspector, oi).convert(partSpec.get(key)); + javaStringObjectInspector, oi).convert(partSpec.get(key)); } partObjectInspectors.add(oi); } @@ -278,6 +279,7 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, if (opCtx.hasVC()) { inspectors.add(opCtx.vcsObjectInspector); } + opCtx.rowObjectInspector = ObjectInspectorFactory.getUnionStructObjectInspector(inspectors); return opCtx; } @@ -341,6 +343,56 @@ private boolean isPartitioned(PartitionDesc pd) { return pd.getPartSpec() != null && !pd.getPartSpec().isEmpty(); } + public void setChildrenSecondary(Configuration hconf) throws HiveException { + + List> children = + new ArrayList>(); + + Map convertedOI = getConvertedOI(hconf); + try { + for (Map.Entry> entry : conf.getPathToAliases().entrySet()) { + String onefile = entry.getKey(); + List aliases = entry.getValue(); + + Path onepath = new Path(onefile); + + PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile); + + for (String onealias : aliases) { + Operator op = conf.getAliasToWork().get(onealias); + MapInputPath inp = new MapInputPath(onefile, onealias, op, partDesc); + if (opCtxMap.containsKey(inp)) { + continue; + } + MapOpCtx opCtx = initObjectInspector(hconf, inp, convertedOI); + opCtxMap.put(inp, opCtx); + + op.setParentOperators(new ArrayList>()); + op.getParentOperators().add(this); + LOG.info("MapOPerator: operator is " + op); + // check for the operators who will process rows coming to this Map + // Operator + children.add(op); + childrenOpToOpCtxMap.put(op, opCtx); + LOG.info("dump " + op + " " + + opCtxMap.get(inp).rowObjectInspector.getTypeName()); + current = opCtx; // just need for TestOperators.testMapOperator + } + } + + if (children.size() == 0) { + // didn't find match for input file path in configuration! + // serious problem .. + throw new HiveException("Unable to set up children"); + } + + // we found all the operators that we are supposed to process. + setChildOperators(children); + } catch (Exception e) { + throw new HiveException(e); + } + } + public void setChildren(Configuration hconf) throws HiveException { Path fpath = IOContext.get().getInputPath(); @@ -354,6 +406,8 @@ public void setChildren(Configuration hconf) throws HiveException { try { for (Map.Entry> entry : conf.getPathToAliases().entrySet()) { + LOG.info("Path to aliases: " + entry.getKey() + " values: " + + Arrays.toString(entry.getValue().toArray())); String onefile = entry.getKey(); List aliases = entry.getValue(); @@ -367,7 +421,7 @@ public void setChildren(Configuration hconf) throws HiveException { for (String onealias : aliases) { Operator op = conf.getAliasToWork().get(onealias); if (LOG.isDebugEnabled()) { - LOG.debug("Adding alias " + onealias + " to work list for file " + LOG.info("Adding alias " + onealias + " to work list for file " + onefile); } MapInputPath inp = new MapInputPath(onefile, onealias, op, partDesc); @@ -639,4 +693,23 @@ public OperatorType getType() { return null; } + @Override + public Object getNextRow(String inputName) throws Exception { + // This map operator definitely belongs to merge work. + Object nextRow = MapRecordProcessor.getNextRow(inputName); + ExecMapperContext context = getExecContext(); + boolean resetValue = context.inputFileChanged(); + context.setSkipFileChangedCheck(true); + if (nextRow == null) { + return nextRow; + } + process((Writable) nextRow); + context.setSkipFileChangedCheck(resetValue); + return nextRow; + } + + @Override + public Map getConnectOps() { + return MapRecordProcessor.getConnectOps(); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index db94271..bb0ed07 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -146,6 +146,7 @@ public int getNumChild() { /** * Implements the getChildren function for the Node Interface. */ + @Override public ArrayList getChildren() { if (getChildOperators() == null) { @@ -309,10 +310,13 @@ protected boolean areAllParentsInitialized() { //return true; continue; } + LOG.info("Parent being checked for initialization: " + parent); if (parent.state != State.INIT) { + LOG.info("Parent state is not initialized: " + parent); return false; } } + LOG.info("Parents are initialized"); return true; } @@ -335,6 +339,7 @@ public void initialize(Configuration hconf, ObjectInspector[] inputOIs) this.configuration = hconf; if (!areAllParentsInitialized()) { + LOG.info("Not all parents initialized"); return; } @@ -377,6 +382,7 @@ public void initialize(Configuration hconf, ObjectInspector[] inputOIs) //pass the exec context to child operators passExecContext(this.execContext); + LOG.info("Initialize the operator - " + getName()); initializeOp(hconf); // sanity check @@ -450,6 +456,8 @@ public void passExecContext(ExecMapperContext execContext) { */ protected void initialize(Configuration hconf, ObjectInspector inputOI, int parentId) throws HiveException { + LOG.info("id is " + id); + LOG.info("name is " + getName()); LOG.info("Initializing child " + id + " " + getName()); // Double the size of the array if needed if (parentId >= inputObjInspectors.length) { @@ -632,11 +640,14 @@ public void jobClose(Configuration conf, boolean success) return; } + LOG.info("JobClose of " + this); + jobCloseOp(conf, success); jobCloseDone = true; if (childOperators != null) { for (Operator op : childOperators) { + LOG.info("JobClose of " + op + " being called (child)"); op.jobClose(conf, success); } } @@ -851,6 +862,7 @@ public void logStats() { * * @return the name of the operator */ + @Override public String getName() { return getOperatorName(); } @@ -1061,7 +1073,7 @@ public boolean supportSkewJoinOptimization() { if (parents != null) { for (Operator parent : parents) { - parentClones.add((Operator)(parent.clone())); + parentClones.add((parent.clone())); } } @@ -1082,8 +1094,8 @@ public boolean supportSkewJoinOptimization() { public Operator cloneOp() throws CloneNotSupportedException { T descClone = (T) conf.clone(); Operator ret = - (Operator) OperatorFactory.getAndMakeChild( - descClone, getSchema()); + OperatorFactory.getAndMakeChild( + descClone, getSchema()); return ret; } @@ -1254,15 +1266,15 @@ public Statistics getStatistics() { } return null; } - + public OpTraits getOpTraits() { if (conf != null) { return conf.getOpTraits(); } - + return null; } - + public void setOpTraits(OpTraits metaInfo) { if (LOG.isDebugEnabled()) { LOG.debug("Setting traits ("+metaInfo+") on "+this); @@ -1299,7 +1311,24 @@ public static Operator createDummy() { private static class DummyOperator extends Operator { public DummyOperator() { super("dummy"); } + @Override public void processOp(Object row, int tag) { } + @Override public OperatorType getType() { return null; } } + + public Object getNextRow(String string) throws Exception { + // all operators except the MapOperator just fetch and process the row. + assert (getParentOperators().size() == 1); + Object row = getParentOperators().get(0).getNextRow(string); + return row; + } + + public Map getConnectOps() { + if ((parentOperators == null) || (parentOperators.size() == 0)) { + return null; + } + Map dummyOps = parentOperators.get(0).getConnectOps(); + return dummyOps; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 487bb33..31c8658 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -226,7 +226,7 @@ private byte tagForAlias(String alias) { public void cleanUpInputFileChangedOp() throws HiveException { inputFileChanged = true; } - + protected List smbJoinComputeKeys(Object row, byte alias) throws HiveException { return JoinUtil.computeKeys(row, joinKeys[alias], joinKeysObjectInspectors[alias]); @@ -265,8 +265,8 @@ public void processOp(Object row, int tag) throws HiveException { byte alias = (byte) tag; // compute keys and values as StandardObjects - List key = smbJoinComputeKeys(row, alias); - + List key = smbJoinComputeKeys(row, alias); + List value = getFilteredValue(alias, row); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 1d6a93a..70f66eb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -143,6 +143,7 @@ import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; @@ -199,6 +200,7 @@ public static String HADOOP_LOCAL_FS = "file:///"; public static String MAP_PLAN_NAME = "map.xml"; public static String REDUCE_PLAN_NAME = "reduce.xml"; + public static String MERGE_PLAN_NAME = "merge.xml"; public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class"; public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class"; @@ -278,7 +280,7 @@ public static void setMapWork(Configuration conf, MapWork work) { } public static MapWork getMapWork(Configuration conf) { - return (MapWork) getBaseWork(conf, MAP_PLAN_NAME); + return (MapWork) getBaseWork(conf, MAP_PLAN_NAME, MapWork.class); } public static void setReduceWork(Configuration conf, ReduceWork work) { @@ -286,7 +288,7 @@ public static void setReduceWork(Configuration conf, ReduceWork work) { } public static ReduceWork getReduceWork(Configuration conf) { - return (ReduceWork) getBaseWork(conf, REDUCE_PLAN_NAME); + return (ReduceWork) getBaseWork(conf, REDUCE_PLAN_NAME, ReduceWork.class); } public static void cacheBaseWork(Configuration conf, String name, BaseWork work, @@ -309,14 +311,17 @@ public static void setBaseWork(Configuration conf, String name, BaseWork work) { } /** - * Returns the Map or Reduce plan - * Side effect: the BaseWork returned is also placed in the gWorkMap + * Returns the Map or Reduce plan Side effect: the BaseWork returned is also placed in the + * gWorkMap + * * @param conf * @param name + * @param clazz * @return BaseWork based on the name supplied will return null if name is null - * @throws RuntimeException if the configuration files are not proper or if plan can not be loaded + * @throws RuntimeException + * if the configuration files are not proper or if plan can not be loaded */ - private static BaseWork getBaseWork(Configuration conf, String name) { + private static BaseWork getBaseWork(Configuration conf, String name, Class clazz) { BaseWork gWork = null; Path path = null; InputStream in = null; @@ -332,7 +337,7 @@ private static BaseWork getBaseWork(Configuration conf, String name) { } if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) { - LOG.debug("Loading plan from string: "+path.toUri().getPath()); + LOG.info("Loading plan from string: " + path.toUri().getPath()); String planString = conf.get(path.toUri().getPath()); if (planString == null) { LOG.info("Could not find plan string in conf"); @@ -345,7 +350,7 @@ private static BaseWork getBaseWork(Configuration conf, String name) { in = new FileInputStream(localPath.toUri().getPath()); } - if(MAP_PLAN_NAME.equals(name)){ + if (MAP_PLAN_NAME.equals(name)) { if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){ gWork = deserializePlan(in, MapWork.class, conf); } else if(RCFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS)) || @@ -366,6 +371,8 @@ private static BaseWork getBaseWork(Configuration conf, String name) { throw new RuntimeException("unable to determine work from configuration ." + MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)) ; } + } else if (name.contains(MERGE_PLAN_NAME)) { + gWork = (BaseWork) deserializePlan(in, clazz, conf); } gWorkMap.put(path, gWork); } else { @@ -596,7 +603,8 @@ public static Path setReduceWork(Configuration conf, ReduceWork w, Path hiveScra return setBaseWork(conf, w, hiveScratchDir, REDUCE_PLAN_NAME, useCache); } - private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratchDir, String name, boolean useCache) { + private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratchDir, + String name, boolean useCache) { try { setPlanPath(conf, hiveScratchDir); @@ -608,6 +616,7 @@ private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratch // add it to the conf ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); out = new DeflaterOutputStream(byteOut, new Deflater(Deflater.BEST_SPEED)); + serializePlan(w, out, conf); LOG.info("Setting plan: "+planPath.toUri().getPath()); conf.set(planPath.toUri().getPath(), @@ -1800,7 +1809,7 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I for (int i = 0; i < parts.length; ++i) { assert parts[i].isDir() : "dynamic partition " + parts[i].getPath() - + " is not a direcgtory"; + + " is not a directory"; FileStatus[] items = fs.listStatus(parts[i].getPath()); // remove empty directory since DP insert should not generate empty partitions. @@ -3462,7 +3471,7 @@ public static boolean createDirsWithPermission(Configuration conf, Path mkdir, return createDirsWithPermission(conf, mkdir, fsPermission, recursive); } - private static void resetConfAndCloseFS (Configuration conf, boolean unsetUmask, + private static void resetConfAndCloseFS (Configuration conf, boolean unsetUmask, String origUmask, FileSystem fs) throws IOException { if (unsetUmask) { if (origUmask != null) { @@ -3537,4 +3546,26 @@ public static String getQualifiedPath(HiveConf conf, Path path) throws HiveExcep public static boolean isDefaultNameNode(HiveConf conf) { return !conf.getChangedProperties().containsKey(HiveConf.ConfVars.HADOOPFS.varname); } + + public static Path setMergeWork(JobConf conf, MergeJoinWork mergeJoinWork, Path mrScratchDir, + boolean b) { + for (BaseWork baseWork : mergeJoinWork.getBaseWorkList()) { + setBaseWork(conf, baseWork, mrScratchDir, baseWork.getName() + MERGE_PLAN_NAME, b); + String prefixes = HiveConf.getVar(conf, HiveConf.ConfVars.TEZ_MERGE_WORK_FILE_PREFIXES); + HiveConf.setVar(conf, HiveConf.ConfVars.TEZ_MERGE_WORK_FILE_PREFIXES, prefixes + "," + + baseWork.getName()); + } + + // nothing to return + return null; + } + + public static BaseWork getMergeWork(JobConf jconf, Class clazz) { + String prefix = HiveConf.getVar(jconf, HiveConf.ConfVars.TEZ_MERGE_FILE_PREFIX); + if (prefix == null || prefix.isEmpty()) { + return null; + } + + return getBaseWork(jconf, prefix + MERGE_PLAN_NAME, clazz); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java index 74bc2d2..6dc9fef 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java @@ -52,6 +52,8 @@ private String currentBigBucketFile=null; + private boolean skipFileChangedCheck; + public String getCurrentBigBucketFile() { return currentBigBucketFile; } @@ -80,6 +82,9 @@ public void clear() { * @return is the input file changed? */ public boolean inputFileChanged() { + if (skipFileChangedCheck) { + return false; + } if (!inputFileChecked) { currentInputPath = this.ioCxt.getInputPath(); inputFileChecked = true; @@ -153,4 +158,8 @@ public IOContext getIoCxt() { public void setIoCxt(IOContext ioCxt) { this.ioCxt = ioCxt; } + + public void setSkipFileChangedCheck(boolean skipFileChangedCheck) { + this.skipFileChangedCheck = skipFileChangedCheck; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java index f2acd75..59e879b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.serializer.SerializationFactory; import org.apache.hadoop.mapred.FileSplit; @@ -39,9 +40,9 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper; import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; -import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexLocationHint; @@ -71,6 +72,15 @@ */ public class CustomPartitionVertex extends VertexManagerPlugin { + public enum VertexType { + AUTO_INITIALIZED_EDGES, // no custom vertex or edge + INITIALIZED_EDGES, // custom vertex and custom edge but single MR Input + MULTI_INPUT_INITIALIZED_EDGES, // custom vertex, custom edge and multi + // MRInput + MULTI_INPUT_UNINITIALIZED_EDGES // custom vertex, no custom edge, multi + // MRInput + } + private static final Log LOG = LogFactory.getLog(CustomPartitionVertex.class.getName()); VertexManagerPluginContext context; @@ -79,9 +89,14 @@ private List dataInformationEvents; private int numBuckets = -1; private Configuration conf = null; - private boolean rootVertexInitialized = false; private final SplitGrouper grouper = new SplitGrouper(); private int taskCount = 0; + private VertexType vertexType; + private String inputNameDecidingParallelism; + private final Multimap bucketToTaskMap = HashMultimap. create(); + + private final Map> inputToGroupedSplitMap = + new HashMap>(); public CustomPartitionVertex(VertexManagerPluginContext context) { super(context); @@ -90,14 +105,24 @@ public CustomPartitionVertex(VertexManagerPluginContext context) { @Override public void initialize() { this.context = getContext(); - ByteBuffer byteBuf = context.getUserPayload().getPayload(); - this.numBuckets = byteBuf.getInt(); + ByteBuffer payload = context.getUserPayload().getPayload(); + CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(payload); + try { + vertexConf.readFields(dibb); + } catch (IOException e) { + throw new RuntimeException(e); + } + this.numBuckets = vertexConf.getNumBuckets(); + this.inputNameDecidingParallelism = vertexConf.getInputName(); + this.vertexType = vertexConf.getVertexType(); } @Override public void onVertexStarted(Map> completions) { int numTasks = context.getVertexNumTasks(context.getVertexName()); - List scheduledTasks = + List scheduledTasks = new ArrayList(numTasks); for (int i = 0; i < numTasks; ++i) { scheduledTasks.add(new VertexManagerPluginContext.TaskWithLocationHint(new Integer(i), null)); @@ -117,13 +142,8 @@ public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) { @Override public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List events) { + LOG.info("On root vertex initialized"); - // Ideally, since there's only 1 Input expected at the moment - - // ensure this method is called only once. Tez will call it once per Root - // Input. - Preconditions.checkState(rootVertexInitialized == false); - LOG.info("Root vertex not initialized"); - rootVertexInitialized = true; try { // This is using the payload from the RootVertexInitializer corresponding // to InputName. Ideally it should be using it's own configuration class - @@ -164,9 +184,6 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr // No tasks should have been started yet. Checked by initial state // check. Preconditions.checkState(dataInformationEventSeen == false); - Preconditions - .checkState(context.getVertexNumTasks(context.getVertexName()) == -1, - "Parallelism for the vertex should be set to -1 if the InputInitializer is setting parallelism"); InputConfigureVertexTasksEvent cEvent = (InputConfigureVertexTasksEvent) event; // The vertex cannot be configured until all DataEvents are seen - to @@ -224,17 +241,52 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr bucketToGroupedSplitMap.putAll(key, groupedSplit.values()); } - LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap.size() + " tasks"); - processAllEvents(inputName, bucketToGroupedSplitMap); + LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap); + if ((inputNameDecidingParallelism.isEmpty() == false) + && (inputNameDecidingParallelism.compareTo(inputName) != 0)) { + /* + * this is the small table side. In case of SMB join, we may need to send each split to the + * corresponding bucket-based task on the other side. In case a split needs to go to + * multiple downstream tasks, we need to clone the event and send it to the right + * destination. + */ + processAllSideEvents(inputName, bucketToGroupedSplitMap); + } else { + processAllEvents(inputName, bucketToGroupedSplitMap); + } } catch (Exception e) { throw new RuntimeException(e); } } + private void processAllSideEvents(String inputName, + Multimap bucketToGroupedSplitMap) throws IOException { + // the bucket to task map should have been setup by the big table. + if (bucketToTaskMap.isEmpty()) { + inputToGroupedSplitMap.put(inputName, bucketToGroupedSplitMap); + return; + } + List taskEvents = new ArrayList(); + for (Entry> entry : bucketToGroupedSplitMap.asMap().entrySet()) { + Collection destTasks = bucketToTaskMap.get(entry.getKey()); + for (Integer task : destTasks) { + for (InputSplit split : entry.getValue()) { + MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(split); + InputDataInformationEvent diEvent = + InputDataInformationEvent.createWithSerializedPayload(task, serializedSplit + .toByteString().asReadOnlyByteBuffer()); + diEvent.setTargetIndex(task); + taskEvents.add(diEvent); + } + } + } + + context.addRootInputEvents(inputName, taskEvents); + } + private void processAllEvents(String inputName, Multimap bucketToGroupedSplitMap) throws IOException { - Multimap bucketToTaskMap = HashMultimap. create(); List finalSplits = Lists.newLinkedList(); for (Entry> entry : bucketToGroupedSplitMap.asMap().entrySet()) { int bucketNum = entry.getKey(); @@ -248,11 +300,13 @@ private void processAllEvents(String inputName, // Construct the EdgeManager descriptor to be used by all edges which need // the routing table. - EdgeManagerPluginDescriptor hiveEdgeManagerDesc = - EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName()); - UserPayload payload = getBytePayload(bucketToTaskMap); - hiveEdgeManagerDesc.setUserPayload(payload); - + EdgeManagerPluginDescriptor hiveEdgeManagerDesc = null; + if ((vertexType == VertexType.MULTI_INPUT_INITIALIZED_EDGES) + || (vertexType == VertexType.INITIALIZED_EDGES)) { + hiveEdgeManagerDesc = EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName()); + UserPayload payload = getBytePayload(bucketToTaskMap); + hiveEdgeManagerDesc.setUserPayload(payload); + } Map emMap = Maps.newHashMap(); // Replace the edge manager for all vertices which have routing type custom. @@ -285,13 +339,23 @@ private void processAllEvents(String inputName, rootInputSpecUpdate.put( inputName, InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate()); - context.setVertexParallelism( - taskCount, - VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits - .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate); + if ((inputNameDecidingParallelism.compareTo(inputName) == 0) + || (inputNameDecidingParallelism.isEmpty())) { + LOG.info("ZZZ: input name deciding parallelism is " + inputName); + context.setVertexParallelism( + taskCount, + VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits + .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate); + } // Set the actual events for the tasks. context.addRootInputEvents(inputName, taskEvents); + if (inputToGroupedSplitMap.isEmpty() == false) { + for (Entry> entry : inputToGroupedSplitMap.entrySet()) { + processAllSideEvents(entry.getKey(), entry.getValue()); + } + inputToGroupedSplitMap.clear(); + } } UserPayload getBytePayload(Multimap routingTable) throws IOException { @@ -315,7 +379,8 @@ private FileSplit getFileSplitFromEvent(InputDataInformationEvent event) throws if (!(inputSplit instanceof FileSplit)) { throw new UnsupportedOperationException( - "Cannot handle splits other than FileSplit for the moment"); + "Cannot handle splits other than FileSplit for the moment. Current input split type: " + + inputSplit.getClass().getSimpleName()); } return (FileSplit) inputSplit; } @@ -327,7 +392,6 @@ private FileSplit getFileSplitFromEvent(InputDataInformationEvent event) throws Map> pathFileSplitsMap) { int bucketNum = 0; - int fsCount = 0; Multimap bucketToInitialSplitMap = ArrayListMultimap. create(); @@ -335,15 +399,22 @@ private FileSplit getFileSplitFromEvent(InputDataInformationEvent event) throws for (Map.Entry> entry : pathFileSplitsMap.entrySet()) { int bucketId = bucketNum % numBuckets; for (FileSplit fsplit : entry.getValue()) { - fsCount++; bucketToInitialSplitMap.put(bucketId, fsplit); } bucketNum++; } - LOG.info("Total number of splits counted: " + fsCount + " and total files encountered: " - + pathFileSplitsMap.size()); + if (bucketNum < numBuckets) { + int loopedBucketId = 0; + for (; bucketNum < numBuckets; bucketNum++) { + for (InputSplit fsplit : bucketToInitialSplitMap.get(loopedBucketId)) { + bucketToInitialSplitMap.put(bucketNum, fsplit); + } + loopedBucketId++; + } + } return bucketToInitialSplitMap; } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java new file mode 100644 index 0000000..22d5eb3 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hive.ql.exec.tez.CustomPartitionVertex.VertexType; +import org.apache.hadoop.io.Writable; + +/* + * This class is used to send a byte buffer configuration to the CustomPartitionVertex class. + */ +public class CustomVertexConfiguration implements Writable { + + private int numBuckets; + private VertexType vertexType; + private String inputName; + + public CustomVertexConfiguration() { + } + + public CustomVertexConfiguration(int numBuckets, VertexType vertexType, String inputName) { + this.numBuckets = numBuckets; + this.vertexType = vertexType; + this.inputName = inputName; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(this.vertexType.ordinal()); + out.writeInt(this.numBuckets); + out.writeUTF(inputName); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.vertexType = VertexType.values()[in.readInt()]; + this.numBuckets = in.readInt(); + this.inputName = in.readUTF(); + } + + public int getNumBuckets() { + return numBuckets; + } + + public VertexType getVertexType() { + return vertexType; + } + + public String getInputName() { + return inputName; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 1ef6cc5..badca2f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; +import org.apache.hadoop.hive.ql.exec.tez.CustomPartitionVertex.VertexType; import org.apache.hadoop.hive.ql.exec.tez.tools.TezMergedLogicalInput; import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; @@ -56,6 +57,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; @@ -102,6 +104,7 @@ import org.apache.tez.mapreduce.hadoop.MRInputHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.input.MultiMRInput; import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.partition.MRPartitioner; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; @@ -124,6 +127,7 @@ private static final Log LOG = LogFactory.getLog(DagUtils.class.getName()); private static final String TEZ_DIR = "_tez_scratch_dir"; private static DagUtils instance; + private List customizedVertexList = new ArrayList(); private void addCredentials(MapWork mapWork, DAG dag) { Set paths = mapWork.getPathToAliases().keySet(); @@ -206,14 +210,17 @@ private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) { } /** - * Given a Vertex group and a vertex createEdge will create an - * Edge between them. + * Given a Vertex group and a vertex createEdge will create an Edge between + * them. * - * @param group The parent VertexGroup - * @param vConf The job conf of one of the parrent (grouped) vertices - * @param w The child vertex - * @param edgeProp the edge property of connection between the two - * endpoints. + * @param group + * The parent VertexGroup + * @param vConf + * The job conf of one of the parent (grouped) vertices + * @param w + * The child vertex + * @param edgeProp + * the edge property of connection between the two endpoints. */ @SuppressWarnings("rawtypes") public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, @@ -231,13 +238,15 @@ public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, break; case CUSTOM_EDGE: { mergeInputClass = ConcatenatedMergedKeyValueInput.class; - int numBuckets = edgeProp.getNumBuckets(); - VertexManagerPluginDescriptor desc = - VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName()); - ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets); - userPayload.flip(); - desc.setUserPayload(UserPayload.create(userPayload)); - w.setVertexManagerPlugin(desc); + if (customizedVertexList.contains(w) == false) { // otherwise, we have already setup the desc + int numBuckets = edgeProp.getNumBuckets(); + VertexManagerPluginDescriptor desc = + VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName()); + ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets); + userPayload.flip(); + desc.setUserPayload(UserPayload.create(userPayload)); + w.setVertexManagerPlugin(desc); + } break; } @@ -273,13 +282,19 @@ public Edge createEdge(JobConf vConf, Vertex v, Vertex w, switch(edgeProp.getEdgeType()) { case CUSTOM_EDGE: { - int numBuckets = edgeProp.getNumBuckets(); - ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets); - userPayload.flip(); - VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create( - CustomPartitionVertex.class.getName()); - desc.setUserPayload(UserPayload.create(userPayload)); - w.setVertexManagerPlugin(desc); + if (customizedVertexList.contains(w) == false) { // desc already setup otherwise + int numBuckets = edgeProp.getNumBuckets(); + CustomVertexConfiguration vertexConf = + new CustomVertexConfiguration(numBuckets, VertexType.INITIALIZED_EDGES, ""); + DataOutputBuffer dob = new DataOutputBuffer(); + vertexConf.write(dob); + VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create( + CustomPartitionVertex.class.getName()); + byte[] userPayloadBytes = dob.getData(); + ByteBuffer userPayload = ByteBuffer.wrap(userPayloadBytes); + desc.setUserPayload(UserPayload.create(userPayload)); + w.setVertexManagerPlugin(desc); + } break; } case SIMPLE_EDGE: { @@ -309,37 +324,40 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration switch (edgeType) { case BROADCAST_EDGE: UnorderedKVEdgeConfig et1Conf = UnorderedKVEdgeConfig - .newBuilder(keyClass, valClass).setFromConfiguration(conf).build(); + .newBuilder(keyClass, valClass).setFromConfiguration(conf).build(); return et1Conf.createDefaultBroadcastEdgeProperty(); + case CUSTOM_EDGE: assert partitionerClassName != null; partitionerConf = createPartitionerConf(partitionerClassName, conf); UnorderedPartitionedKVEdgeConfig et2Conf = UnorderedPartitionedKVEdgeConfig - .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf).build(); + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf).build(); EdgeManagerPluginDescriptor edgeDesc = - EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName()); + EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName()); CustomEdgeConfiguration edgeConf = - new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null); + new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null); DataOutputBuffer dob = new DataOutputBuffer(); edgeConf.write(dob); byte[] userPayload = dob.getData(); edgeDesc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload))); return et2Conf.createDefaultCustomEdgeProperty(edgeDesc); + case CUSTOM_SIMPLE_EDGE: assert partitionerClassName != null; partitionerConf = createPartitionerConf(partitionerClassName, conf); UnorderedPartitionedKVEdgeConfig et3Conf = UnorderedPartitionedKVEdgeConfig - .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf).build(); + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf).build(); return et3Conf.createDefaultEdgeProperty(); + case SIMPLE_EDGE: default: assert partitionerClassName != null; partitionerConf = createPartitionerConf(partitionerClassName, conf); OrderedPartitionedKVEdgeConfig et4Conf = OrderedPartitionedKVEdgeConfig - .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf).build(); + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf).build(); return et4Conf.createDefaultEdgeProperty(); } } @@ -407,12 +425,59 @@ private String getContainerJavaOpts(Configuration conf) { return MRHelpers.getJavaOptsForMRMapper(conf); } + private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, LocalResource appJarLr, + List additionalLr, FileSystem fs, Path mrScratchDir, Context ctx) + throws Exception { + Utilities.setMergeWork(conf, mergeJoinWork, mrScratchDir, false); + Vertex mergeVx = null; + if (mergeJoinWork.getMainWork() instanceof MapWork) { + List mapWorkList = mergeJoinWork.getBaseWorkList(); + MapWork mapWork = (MapWork) (mergeJoinWork.getMainWork()); + mergeVx = createVertex(conf, mapWork, appJarLr, additionalLr, fs, mrScratchDir, ctx); + + // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat + // here would cause pre-mature grouping which would be incorrect. + Class inputFormatClass = HiveInputFormat.class; + conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class); + // mapreduce.tez.input.initializer.serialize.event.payload should be set + // to false when using this plug-in to avoid getting a serialized event at run-time. + conf.setBoolean("mapreduce.tez.input.initializer.serialize.event.payload", false); + for (int i = 0; i < mapWorkList.size(); i++) { + + mapWork = (MapWork) (mapWorkList.get(i)); + HiveConf.setVar(conf, HiveConf.ConfVars.TEZ_MERGE_FILE_PREFIX, mapWork.getName()); + LOG.info("Going through each work and adding MultiMRInput"); + mergeVx.addDataSource(mapWork.getName(), + MultiMRInput.createConfigBuilder(conf, HiveInputFormat.class).build()); + } + + VertexManagerPluginDescriptor desc = + VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName()); + VertexType vertexType = mergeJoinWork.getVertexType(); + CustomVertexConfiguration vertexConf = + new CustomVertexConfiguration(mergeJoinWork.getMergeJoinOperator().getConf() + .getNumBuckets(), vertexType, mergeJoinWork.getBigTableAlias()); + DataOutputBuffer dob = new DataOutputBuffer(); + vertexConf.write(dob); + byte[] userPayload = dob.getData(); + desc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload))); + mergeVx.setVertexManagerPlugin(desc); + } else { + mergeVx = + createVertex(conf, (ReduceWork) mergeJoinWork.getMainWork(), appJarLr, additionalLr, fs, + mrScratchDir, ctx); + } + customizedVertexList.add(mergeVx); + return mergeVx; + } + /* * Helper function to create Vertex from MapWork. */ private Vertex createVertex(JobConf conf, MapWork mapWork, LocalResource appJarLr, List additionalLr, FileSystem fs, - Path mrScratchDir, Context ctx, TezWork tezWork) throws Exception { + Path mrScratchDir, Context ctx) + throws Exception { Path tezDir = getTezDir(mrScratchDir); @@ -424,7 +489,6 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, // finally create the vertex Vertex map = null; - // use tez to combine splits boolean groupSplitsInInputInitializer; @@ -434,15 +498,8 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, Class inputFormatClass = conf.getClass("mapred.input.format.class", InputFormat.class); - boolean vertexHasCustomInput = false; - if (tezWork != null) { - for (BaseWork baseWork : tezWork.getParents(mapWork)) { - if (tezWork.getEdgeType(baseWork, mapWork) == EdgeType.CUSTOM_EDGE) { - vertexHasCustomInput = true; - } - } - } - + boolean vertexHasCustomInput = !(mapWork.getDoSplitsGrouping()); + LOG.info("Vertex has custom input? " + vertexHasCustomInput); if (vertexHasCustomInput) { groupSplitsInInputInitializer = false; // grouping happens in execution phase. The input payload should not enable grouping here, @@ -882,12 +939,22 @@ public JobConf initializeVertexConf(JobConf conf, BaseWork work) { return initializeVertexConf(conf, (MapWork)work); } else if (work instanceof ReduceWork) { return initializeVertexConf(conf, (ReduceWork)work); + } else if (work instanceof MergeJoinWork) { + return initializeVertexConf(conf, (MergeJoinWork) work); } else { assert false; return null; } } + private JobConf initializeVertexConf(JobConf conf, MergeJoinWork work) { + if (work.getMainWork() instanceof MapWork) { + return initializeVertexConf(conf, (MapWork) (work.getMainWork())); + } else { + return initializeVertexConf(conf, (ReduceWork) (work.getMainWork())); + } + } + /** * Create a vertex from a given work object. * @@ -912,10 +979,13 @@ public Vertex createVertex(JobConf conf, BaseWork work, // BaseWork. if (work instanceof MapWork) { v = createVertex(conf, (MapWork) work, appJarLr, - additionalLr, fileSystem, scratchDir, ctx, tezWork); + additionalLr, fileSystem, scratchDir, ctx); } else if (work instanceof ReduceWork) { v = createVertex(conf, (ReduceWork) work, appJarLr, additionalLr, fileSystem, scratchDir, ctx); + } else if (work instanceof MergeJoinWork) { + v = createVertex(conf, (MergeJoinWork) work, appJarLr, + additionalLr, fileSystem, scratchDir, ctx); } else { // something is seriously wrong if this is happening throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 7556d7b..3e5db7c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -25,6 +28,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; @@ -36,6 +41,7 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; +import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger; import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -44,6 +50,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.StringUtils; import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.input.MultiMRInput; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; @@ -57,12 +64,16 @@ public class MapRecordProcessor extends RecordProcessor { + private static KeyValueReader mergeQueue; private MapOperator mapOp; + private MapOperator mergeMapOp = null; public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class); private final ExecMapperContext execContext = new ExecMapperContext(); private boolean abort = false; protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; private MapWork mapWork; + private static Map connectOps = + new HashMap(); public MapRecordProcessor(JobConf jconf) { ObjectCache cache = ObjectCacheFactory.getCache(jconf); @@ -105,12 +116,45 @@ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrRep } try { + + List mergeWorkList = new ArrayList(); + String prefixes = HiveConf.getVar(jconf, HiveConf.ConfVars.TEZ_MERGE_WORK_FILE_PREFIXES); + for (String prefix : prefixes.split(",")) { + if ((prefix != null) && (prefix.isEmpty() == false)) { + HiveConf.setVar(jconf, HiveConf.ConfVars.TEZ_MERGE_FILE_PREFIX, prefix); + MapWork mergeMapWork = (MapWork) Utilities.getMergeWork(jconf, MapWork.class); + mergeWorkList.add(mergeMapWork); + } + } + + l4j.info("Plan: " + mapWork); if (mapWork.getVectorMode()) { mapOp = new VectorMapOperator(); } else { mapOp = new MapOperator(); } + connectOps.clear(); + mergeQueue = null; + if (mergeWorkList != null) { + for (MapWork mergeMapWork : mergeWorkList) { + if (mergeMapWork.getVectorMode()) { + mergeMapOp = new VectorMapOperator(); + } else { + mergeMapOp = new MapOperator(); + } + // initialize the merge operators first. + if (mergeMapOp != null) { + mergeMapOp.setConf(mergeMapWork); + mergeMapOp.setChildrenSecondary(jconf); + DummyStoreOperator dummyOp = getJoinParentOp(mergeMapOp); + connectOps.put(dummyOp, mergeMapWork.getTag()); + mergeMapOp.setExecContext(execContext); + mergeMapOp.initializeLocalWork(jconf); + } + } + } + // initialize map operator mapOp.setConf(mapWork); mapOp.setChildren(jconf); @@ -120,7 +164,11 @@ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrRep ((TezContext)MapredContext.get()).setInputs(inputs); mapOp.setExecContext(execContext); mapOp.initializeLocalWork(jconf); + mapOp.initialize(jconf, null); + if (mergeMapOp != null) { + mergeMapOp.initialize(jconf, null); + } // Initialization isn't finished until all parents of all operators // are initialized. For broadcast joins that means initializing the @@ -150,8 +198,20 @@ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrRep perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); } + private DummyStoreOperator getJoinParentOp(Operator mergeMapOp) { + for (Operator childOp : mergeMapOp.getChildOperators()) { + l4j.info("Child op we are walking: " + childOp); + if ((childOp.getChildOperators() == null) || (childOp.getChildOperators().isEmpty())) { + return (DummyStoreOperator) childOp; + } else { + return getJoinParentOp(childOp); + } + } + return null; + } + @Override - void run() throws IOException{ + void run() throws IOException, InterruptedException { MRInputLegacy in = TezProcessor.getMRInput(inputs); KeyValueReader reader = in.getReader(); @@ -213,6 +273,9 @@ void close(){ return; } mapOp.close(abort); + if (mergeMapOp != null) { + mergeMapOp.close(abort); + } // Need to close the dummyOps as well. The operator pipeline // is not considered "closed/done" unless all operators are @@ -241,4 +304,24 @@ void close(){ MapredContext.close(); } } + + public static Object getNextRow(String inputName) throws Exception { + // get the next row from the priority queue corresponding to this input name + if (mergeQueue == null) { + MultiMRInput multiMRInput = TezProcessor.getInput(inputName); + Collection kvReaders = multiMRInput.getKeyValueReaders(); + List kvReaderList = new ArrayList(kvReaders); + mergeQueue = new KeyValueInputMerger(kvReaderList); + } + + if (mergeQueue.next()) { + return mergeQueue.getCurrentValue(); + } else { + return null; + } + } + + public static Map getConnectOps() { + return connectOps; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index e278572..39efc0b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -27,6 +28,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.ObjectCache; @@ -37,13 +40,14 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; -import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger; +import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValuesInputMerger; import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -53,8 +57,10 @@ import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.mapred.JobConf; @@ -67,6 +73,8 @@ import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.library.api.KeyValuesReader; +import sun.tools.tree.ThisExpression; + /** * Process input from tez LogicalInput and write output - for a map plan * Just pump the records through the query plan. @@ -78,14 +86,14 @@ public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class); private final ExecMapperContext execContext = new ExecMapperContext(); private boolean abort = false; - private Deserializer inputKeyDeserializer; + private static Deserializer inputKeyDeserializer; // Input value serde needs to be an array to support different SerDe // for different tags - private final SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE]; + private final static SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE]; TableDesc keyTableDesc; - TableDesc[] valueTableDesc; + static TableDesc[] valueTableDesc; ObjectInspector[] rowObjectInspector; private Operator reducer; @@ -94,7 +102,7 @@ private Object keyObject = null; private BytesWritable groupKey; - private ReduceWork redWork; + private static ReduceWork redWork; private boolean vectorized = false; @@ -110,6 +118,9 @@ /* this is only used in the error code path */ private List[] valueStringWriters; + private static Map inputNameKVSReaderMap = + new HashMap(); + @Override void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter, Map inputs, Map outputs) throws Exception { @@ -118,6 +129,10 @@ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrRep ObjectCache cache = ObjectCacheFactory.getCache(jconf); + for (Entry entry : inputs.entrySet()) { + l4j.info("REDUCER name : " + entry.getKey() + " Logical input type: " + entry.getValue()); + } + rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector keyObjectInspector; @@ -131,14 +146,14 @@ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrRep } reducer = redWork.getReducer(); - reducer.setParentOperators(null); // clear out any parents as reducer is the - // root + reducer.getParentOperators().clear(); + reducer.setParentOperators(null); // clear out any parents as reducer is the root isTagged = redWork.getNeedsTagging(); vectorized = redWork.getVectorMode(); try { keyTableDesc = redWork.getKeyDesc(); - inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc + inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc .getDeserializerClass(), null); SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null); keyObjectInspector = inputKeyDeserializer.getObjectInspector(); @@ -150,7 +165,7 @@ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrRep keyStructInspector = (StructObjectInspector)keyObjectInspector; batches = new VectorizedRowBatch[maxTags]; valueStructInspectors = new StructObjectInspector[maxTags]; - valueStringWriters = (List[])new List[maxTags]; + valueStringWriters = new List[maxTags]; keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size(); buffer = new DataOutputBuffer(); } @@ -258,7 +273,24 @@ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrRep @Override void run() throws Exception { - List shuffleInputs = getShuffleInputs(inputs); + inputNameKVSReaderMap.clear(); + Map inputEntry = new HashMap(); + if (redWork.getReducer() instanceof CommonMergeJoinOperator) { + CommonMergeJoinOperator mergeJoinOp = (CommonMergeJoinOperator) redWork.getReducer(); + String inputName = + mergeJoinOp.getAliasToInputNameMap().get(mergeJoinOp.getConf().getBigTablePosition()); + inputEntry.put(inputName, inputs.get(inputName)); + for (Entry entry : mergeJoinOp.getAliasToInputNameMap().entrySet()) { + if (entry.getKey() == mergeJoinOp.getConf().getBigTablePosition()) { + continue; + } + initializePriorityQueueForInput(entry.getValue(), inputs.get(entry.getValue())); + } + } else { + inputEntry.putAll(inputs); + } + + List shuffleInputs = getShuffleInputs(inputEntry); if (shuffleInputs != null) { l4j.info("Waiting for ShuffleInputs to become ready"); processorContext.waitForAllInputsReady(new ArrayList(shuffleInputs)); @@ -277,7 +309,7 @@ void run() throws Exception { kvsReader = (KeyValuesReader) shuffleInputs.get(0).getReader(); }else { //get a sort merged input - kvsReader = new InputMerger(shuffleInputs); + kvsReader = new KeyValuesInputMerger(shuffleInputs); } } catch (Exception e) { throw new IOException(e); @@ -291,7 +323,32 @@ void run() throws Exception { break; } } + } + + private void initializePriorityQueueForInput(String key, LogicalInput value) + throws InterruptedException, IOException { + Map inputEntry = new HashMap(); + inputEntry.put(key, value); + List shuffleInputs = getShuffleInputs(inputEntry); + if (shuffleInputs != null) { + l4j.info("Waiting for ShuffleInputs to become ready"); + processorContext.waitForAllInputsReady(new ArrayList(shuffleInputs)); + } + + KeyValuesReader kvsReader; + try { + if (shuffleInputs.size() == 1) { + // no merging of inputs required + kvsReader = (KeyValuesReader) shuffleInputs.get(0).getReader(); + } else { + // get a sort merged input + kvsReader = new KeyValuesInputMerger(shuffleInputs); + } + } catch (Exception e) { + throw new IOException(e); + } + inputNameKVSReaderMap.put(key, kvsReader); } /** @@ -304,7 +361,10 @@ void run() throws Exception { Map tag2input = redWork.getTagToInput(); ArrayList shuffleInputs = new ArrayList(); for(String inpStr : tag2input.values()){ - shuffleInputs.add((LogicalInput)inputs.get(inpStr)); + if (inputs.get(inpStr) == null) { + continue; + } + shuffleInputs.add(inputs.get(inpStr)); } return shuffleInputs; } @@ -336,6 +396,14 @@ private boolean processRows(Object key, Iterable values) { } //Set the key, check if this is a new group or same group + try { + this.keyObject = inputKeyDeserializer.deserialize(keyWritable); + } catch (Exception e) { + throw new HiveException("Hive Runtime Error: Unable to deserialize reduce input key from " + + Utilities.formatBinaryString(keyWritable.getBytes(), 0, keyWritable.getLength()) + + " with properties " + keyTableDesc.getProperties(), e); + } + if (!keyWritable.equals(this.groupKey)) { // If a operator wants to do some work at the beginning of a group if (groupKey == null) { // the first group @@ -348,15 +416,6 @@ private boolean processRows(Object key, Iterable values) { reducer.endGroup(); } - try { - this.keyObject = inputKeyDeserializer.deserialize(keyWritable); - } catch (Exception e) { - throw new HiveException( - "Hive Runtime Error: Unable to deserialize reduce input key from " - + Utilities.formatBinaryString(keyWritable.getBytes(), 0, - keyWritable.getLength()) + " with properties " - + keyTableDesc.getProperties(), e); - } groupKey.set(keyWritable.getBytes(), 0, keyWritable.getLength()); if (isLogTraceEnabled) { l4j.trace("Start Group"); @@ -382,7 +441,8 @@ private boolean processRows(Object key, Iterable values) { } } - private Object deserializeValue(BytesWritable valueWritable, byte tag) throws HiveException { + private static Object deserializeValue(BytesWritable valueWritable, byte tag) + throws HiveException { try { return inputValueDeserializer[tag].deserialize(valueWritable); } catch (SerDeException e) { @@ -402,12 +462,24 @@ private Object deserializeValue(BytesWritable valueWritable, byte tag) throws Hi */ private boolean processKeyValues(Iterable values, byte tag) throws HiveException { + List passDownKey = null; for (Object value : values) { BytesWritable valueWritable = (BytesWritable) value; row.clear(); - row.add(this.keyObject); + if (passDownKey == null) { + row.add(this.keyObject); + } else { + row.add(passDownKey.get(0)); + } row.add(deserializeValue(valueWritable, tag)); + if (passDownKey == null) { + passDownKey = + (List) ObjectInspectorUtils.copyToStandardObject(row, + reducer.getInputObjInspectors()[tag], ObjectInspectorCopyOption.WRITABLE); + row.remove(0); + row.add(0, passDownKey.get(0)); + } try { reducer.processOp(row, tag); @@ -533,4 +605,30 @@ void close(){ } } + public static Object getNextRow(String inputName) throws Exception { + List nextRow = new ArrayList(Utilities.reduceFieldNameList.size()); + KeyValuesReader kvsReader = inputNameKVSReaderMap.get(inputName); + Iterable values = null; + if (kvsReader.next()) { + values = kvsReader.getCurrentValues(); + Object key = kvsReader.getCurrentKey(); + BytesWritable keyWritable = (BytesWritable) key; + Object keyObject = inputKeyDeserializer.deserialize(keyWritable); + nextRow.clear(); + nextRow.add(keyObject); + int tag = 0; + Map tag2input = redWork.getTagToInput(); + for (Entry entry : tag2input.entrySet()) { + if (entry.getValue().equals(inputName)) { + tag = entry.getKey(); + } + } + for (Object value : values) { + BytesWritable valueWritable = (BytesWritable) value; + nextRow.add(deserializeValue(valueWritable, (byte) tag)); + } + } + + return nextRow; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index 8b023dc..e28d11e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -19,6 +19,8 @@ import java.io.IOException; import java.text.NumberFormat; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -32,6 +34,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.input.MultiMRInput; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; @@ -58,6 +61,9 @@ private static final String CLASS_NAME = TezProcessor.class.getName(); private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); + private ProcessorContext processorContext; + private static Map multiMRInputMap; + protected static final NumberFormat taskIdFormat = NumberFormat.getInstance(); protected static final NumberFormat jobIdFormat = NumberFormat.getInstance(); static { @@ -88,7 +94,9 @@ public void initialize() throws IOException { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR); Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); this.jobConf = new JobConf(conf); - setupMRLegacyConfigs(getContext()); + this.processorContext = getContext(); + setupMRLegacyConfigs(processorContext); + multiMRInputMap = new HashMap(); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR); } @@ -132,10 +140,12 @@ public void run(Map inputs, Map out if (isMap) { rproc = new MapRecordProcessor(jobConf); MRInputLegacy mrInput = getMRInput(inputs); - try { - mrInput.init(); - } catch (IOException e) { - throw new RuntimeException("Failed while initializing MRInput", e); + if (mrInput != null) { + try { + mrInput.init(); + } catch (IOException e) { + throw new RuntimeException("Failed while initializing MRInput", e); + } } } else { rproc = new ReduceRecordProcessor(); @@ -207,18 +217,29 @@ public void collect(Object key, Object value) throws IOException { } } - static MRInputLegacy getMRInput(Map inputs) { + static MRInputLegacy getMRInput(Map inputs) throws InterruptedException, + IOException { //there should be only one MRInput MRInputLegacy theMRInput = null; - for(LogicalInput inp : inputs.values()){ - if(inp instanceof MRInputLegacy){ + LOG.info("The input names are: " + Arrays.toString(inputs.keySet().toArray())); + for (Entry inp : inputs.entrySet()) { + if (inp.getValue() instanceof MRInputLegacy) { if(theMRInput != null){ throw new IllegalArgumentException("Only one MRInput is expected"); } //a better logic would be to find the alias - theMRInput = (MRInputLegacy)inp; + theMRInput = (MRInputLegacy) inp.getValue(); + } else if (inp.getValue() instanceof MultiMRInput) { + LOG.info("Found input type MultiMRInput: " + inp); + multiMRInputMap.put(inp.getKey(), (MultiMRInput) inp.getValue()); } } return theMRInput; } + + public static MultiMRInput getInput(String inputName) { + LOG.info("Getting input for name: " + multiMRInputMap); + LOG.info("Multi mr input is " + multiMRInputMap.get(inputName) + " input name: " + inputName); + return multiMRInputMap.get(inputName); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java new file mode 100644 index 0000000..08c454b --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez.tools; + +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor; +import org.apache.hadoop.io.BinaryComparable; +import org.apache.tez.runtime.library.api.KeyValueReader; + +/** + * A KeyValuesReader implementation that returns a sorted stream of key-values + * by doing a sorted merge of the key-value in LogicalInputs. + * Tags are in the last byte of the key, so no special handling for tags is required. + * Uses a priority queue to pick the KeyValuesReader of the input that is next in + * sort order. + */ +public class KeyValueInputMerger extends KeyValueReader { + + public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class); + private PriorityQueue pQueue = null; + private KeyValueReader nextKVReader = null; + + public KeyValueInputMerger(List multiMRInputs) throws Exception { + //get KeyValuesReaders from the LogicalInput and add them to priority queue + int initialCapacity = multiMRInputs.size(); + pQueue = new PriorityQueue(initialCapacity, new KVReaderComparator()); + l4j.info("Initialized the priority queue with multi mr inputs: " + multiMRInputs.size()); + for (KeyValueReader input : multiMRInputs) { + addToQueue(input); + } + } + + /** + * Add KeyValueReader to queue if it has more key-value + * + * @param kvReader + * @throws IOException + */ + private void addToQueue(KeyValueReader kvReader) throws IOException { + if (kvReader.next()) { + pQueue.add(kvReader); + } + } + + /** + * @return true if there are more key-values and advances to next key-values + * @throws IOException + */ + @Override + public boolean next() throws IOException { + //add the previous nextKVReader back to queue + if(nextKVReader != null){ + addToQueue(nextKVReader); + } + + //get the new nextKVReader with lowest key + nextKVReader = pQueue.poll(); + return nextKVReader != null; + } + + @Override + public Object getCurrentKey() throws IOException { + return nextKVReader.getCurrentKey(); + } + + @Override + public Object getCurrentValue() throws IOException { + return nextKVReader.getCurrentValue(); + } + + /** + * Comparator that compares KeyValuesReader on their current key + */ + class KVReaderComparator implements Comparator { + + @Override + public int compare(KeyValueReader kvReadr1, KeyValueReader kvReadr2) { + try { + BinaryComparable key1 = (BinaryComparable) kvReadr1.getCurrentValue(); + BinaryComparable key2 = (BinaryComparable) kvReadr2.getCurrentValue(); + return key1.compareTo(key2); + } catch (IOException e) { + l4j.error("Caught exception while reading shuffle input", e); + //die! + throw new RuntimeException(e); + } + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java new file mode 100644 index 0000000..c06907e --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez.tools; + +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor; +import org.apache.hadoop.io.BinaryComparable; +import org.apache.tez.runtime.api.Input; +import org.apache.tez.runtime.library.api.KeyValuesReader; + +/** + * A KeyValuesReader implementation that returns a sorted stream of key-values + * by doing a sorted merge of the key-value in LogicalInputs. + * Tags are in the last byte of the key, so no special handling for tags is required. + * Uses a priority queue to pick the KeyValuesReader of the input that is next in + * sort order. + */ +public class KeyValuesInputMerger extends KeyValuesReader { + + public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class); + private PriorityQueue pQueue = null; + private KeyValuesReader nextKVReader = null; + + public KeyValuesInputMerger(List shuffleInputs) throws Exception { + //get KeyValuesReaders from the LogicalInput and add them to priority queue + int initialCapacity = shuffleInputs.size(); + pQueue = new PriorityQueue(initialCapacity, new KVReaderComparator()); + for(Input input : shuffleInputs){ + addToQueue((KeyValuesReader)input.getReader()); + } + } + + /** + * Add KeyValuesReader to queue if it has more key-values + * @param kvsReadr + * @throws IOException + */ + private void addToQueue(KeyValuesReader kvsReadr) throws IOException{ + if(kvsReadr.next()){ + pQueue.add(kvsReadr); + } + } + + /** + * @return true if there are more key-values and advances to next key-values + * @throws IOException + */ + public boolean next() throws IOException { + //add the previous nextKVReader back to queue + if(nextKVReader != null){ + addToQueue(nextKVReader); + } + + //get the new nextKVReader with lowest key + nextKVReader = pQueue.poll(); + return nextKVReader != null; + } + + public Object getCurrentKey() throws IOException { + return nextKVReader.getCurrentKey(); + } + + public Iterable getCurrentValues() throws IOException { + return nextKVReader.getCurrentValues(); + } + + /** + * Comparator that compares KeyValuesReader on their current key + */ + class KVReaderComparator implements Comparator { + + @Override + public int compare(KeyValuesReader kvReadr1, KeyValuesReader kvReadr2) { + try { + BinaryComparable key1 = (BinaryComparable) kvReadr1.getCurrentKey(); + BinaryComparable key2 = (BinaryComparable) kvReadr2.getCurrentKey(); + return key1.compareTo(key2); + } catch (IOException e) { + l4j.error("Caught exception while reading shuffle input", e); + //die! + throw new RuntimeException(e); + } + } + } + + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java index 9801a0d..277be4c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java @@ -40,7 +40,7 @@ public TezMergedLogicalInput(MergedInputContext context, List inputs) { @Override public Reader getReader() throws Exception { - return new InputMerger(getInputs()); + return new KeyValuesInputMerger(getInputs()); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index 3560442..47a4860 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hive.ql.plan; -import java.util.LinkedList; import java.util.LinkedHashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -41,6 +41,7 @@ // Their function is mainly as root ops to give the mapjoin the correct // schema info. List dummyOps; + int tag; public BaseWork() {} @@ -100,7 +101,7 @@ public void addDummyOp(HashTableDummyOperator dummyOp) { // add all children opStack.addAll(opSet); - + while(!opStack.empty()) { Operator op = opStack.pop(); returnSet.add(op); @@ -139,4 +140,12 @@ public boolean getVectorMode() { } public abstract void configureJobConf(JobConf job); + + public void setTag(int tag) { + this.tag = tag; + } + + public int getTag() { + return tag; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java new file mode 100644 index 0000000..62f0dff --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; + +@Explain(displayName = "Common Merge Join Operator") +public class CommonMergeJoinDesc extends MapJoinDesc implements Serializable { + private static final long serialVersionUID = 1L; + private int numBuckets; + private boolean isSubQuery; + private int mapJoinConversionPos; + private boolean isMapSideJoin; + + CommonMergeJoinDesc() { + } + + public CommonMergeJoinDesc(int numBuckets, boolean isSubQuery, int mapJoinConversionPos, + MapJoinDesc joinDesc) { + super(joinDesc); + this.numBuckets = numBuckets; + this.isSubQuery = isSubQuery; + this.mapJoinConversionPos = mapJoinConversionPos; + } + + public boolean getCustomMerge() { + return isSubQuery; + } + + public int getNumBuckets() { + return numBuckets; + } + + public int getBigTablePosition() { + return mapJoinConversionPos; + } + + public void setIsMapSideJoin(boolean isMapSideJoin) { + this.isMapSideJoin = isMapSideJoin; + } + + @Override + public boolean isMapSideJoin() { + return this.isMapSideJoin; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java index 57ab9de..156a4e9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java @@ -198,7 +198,7 @@ public void setDumpFilePrefix(String dumpFilePrefix) { } return keyMap; } - + /** * @return the keys */ @@ -323,12 +323,16 @@ public void setHashTableMemoryUsage(float hashtableMemoryUsage) { public float getHashTableMemoryUsage() { return hashtableMemoryUsage; } - + public void setCustomBucketMapJoin(boolean customBucketMapJoin) { this.customBucketMapJoin = customBucketMapJoin; } - + public boolean getCustomBucketMapJoin() { return this.customBucketMapJoin; } + + public boolean isMapSideJoin() { + return true; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index b4278d3..39639b9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -26,9 +26,9 @@ import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; -import java.util.Set; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -116,6 +116,10 @@ private boolean useOneNullRowInputFormat; + private final boolean vectorMode = false; + + private boolean doSplitsGrouping = true; + public MapWork() {} public MapWork(String name) { @@ -525,4 +529,12 @@ public void logPathToAliases() { } } } + + public void setDoSplitsGrouping(boolean doSplitsGrouping) { + this.doSplitsGrouping = doSplitsGrouping; + } + + public boolean getDoSplitsGrouping() { + return this.doSplitsGrouping; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java new file mode 100644 index 0000000..d4bd64a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java @@ -0,0 +1,139 @@ +package org.apache.hadoop.hive.ql.plan; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; +import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.tez.CustomPartitionVertex.VertexType; +import org.apache.hadoop.mapred.JobConf; + +public class MergeJoinWork extends BaseWork { + + private VertexType vertexType = VertexType.AUTO_INITIALIZED_EDGES; + + private static final long serialVersionUID = 1L; + private CommonMergeJoinOperator mergeJoinOp = null; + private final List mergeWorkList = new ArrayList(); + private BaseWork bigTableWork; + private transient Map, Integer> opPosMap; + + public MergeJoinWork() { + super(); + opPosMap = new HashMap, Integer>(); + } + + public MergeJoinWork(BaseWork work) { + super(work.getName()); + opPosMap = new HashMap, Integer>(); + } + + @Explain(displayName = "Vertex") + @Override + public String getName() { + return super.getName(); + } + + @Override + public void replaceRoots(Map, Operator> replacementMap) { + getMainWork().replaceRoots(replacementMap); + } + + @Override + public Set> getAllRootOperators() { + return new HashSet>(); + } + + @Override + public void configureJobConf(JobConf job) { + } + + public CommonMergeJoinOperator getMergeJoinOperator() { + return this.mergeJoinOp; + } + + public void setMergeJoinOperator(CommonMergeJoinOperator mergeJoinOp) { + this.mergeJoinOp = mergeJoinOp; + } + + public void addMergedWork(BaseWork work, BaseWork connectWork) { + if (work != null) { + if ((bigTableWork != null) && (bigTableWork != work)) { + assert false; + } + this.bigTableWork = work; + setName(work.getName()); + if (work instanceof MapWork) { + MapWork mapWork = (MapWork) work; + setVertexType(VertexType.MULTI_INPUT_UNINITIALIZED_EDGES); + mapWork.setDoSplitsGrouping(false); + } + } + + if (connectWork != null) { + this.mergeWorkList.add(connectWork); + } + } + + public List getBaseWorkList() { + return mergeWorkList; + } + + public String getBigTableAlias() { + return ((MapWork) bigTableWork).getAliasToWork().keySet().iterator().next(); + } + + public BaseWork getMainWork() { + return bigTableWork; + } + + public Map, Integer> getOpPosMap() { + return opPosMap; + } + + @Override + public void setDummyOps(List dummyOps) { + getMainWork().setDummyOps(dummyOps); + } + + @Override + public void addDummyOp(HashTableDummyOperator dummyOp) { + getMainWork().addDummyOp(dummyOp); + } + + public void setVertexType(VertexType incomingVertexType) { + switch (this.vertexType) { + case INITIALIZED_EDGES: + if (incomingVertexType == VertexType.MULTI_INPUT_UNINITIALIZED_EDGES) { + vertexType = VertexType.MULTI_INPUT_INITIALIZED_EDGES; + } + break; + + case MULTI_INPUT_INITIALIZED_EDGES: + // nothing to do + break; + + case MULTI_INPUT_UNINITIALIZED_EDGES: + if (incomingVertexType == VertexType.INITIALIZED_EDGES) { + vertexType = VertexType.MULTI_INPUT_INITIALIZED_EDGES; + } + break; + + case AUTO_INITIALIZED_EDGES: + vertexType = incomingVertexType; + break; + + default: + break; + } + } + + public VertexType getVertexType() { + return this.vertexType; + } +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardStructObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardStructObjectInspector.java index c96fc2d..b689afb 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardStructObjectInspector.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardStructObjectInspector.java @@ -51,7 +51,7 @@ protected MyField() { super(); } - + public MyField(int fieldID, String fieldName, ObjectInspector fieldObjectInspector) { this.fieldID = fieldID; @@ -65,18 +65,22 @@ public MyField(int fieldID, String fieldName, this.fieldComment = fieldComment; } + @Override public int getFieldID() { return fieldID; } + @Override public String getFieldName() { return fieldName; } + @Override public ObjectInspector getFieldObjectInspector() { return fieldObjectInspector; } + @Override public String getFieldComment() { return fieldComment; } @@ -133,10 +137,12 @@ protected void init(List fields) { } } + @Override public String getTypeName() { return ObjectInspectorUtils.getStandardStructTypeName(this); } + @Override public final Category getCategory() { return Category.STRUCT; }