diff --git ql/if/queryplan.thrift ql/if/queryplan.thrift index eafbe5a..dbf35e6 100644 --- ql/if/queryplan.thrift +++ ql/if/queryplan.thrift @@ -56,6 +56,7 @@ enum OperatorType { PTF, MUX, DEMUX, + MERGEJOIN, } struct Operator { diff --git ql/src/gen/thrift/gen-cpp/queryplan_types.cpp ql/src/gen/thrift/gen-cpp/queryplan_types.cpp index 96dbb29..34e9007 100644 --- ql/src/gen/thrift/gen-cpp/queryplan_types.cpp +++ ql/src/gen/thrift/gen-cpp/queryplan_types.cpp @@ -51,7 +51,8 @@ int _kOperatorTypeValues[] = { OperatorType::HASHTABLEDUMMY, OperatorType::PTF, OperatorType::MUX, - OperatorType::DEMUX + OperatorType::DEMUX, + OperatorType::MERGEJOIN }; const char* _kOperatorTypeNames[] = { "JOIN", @@ -74,9 +75,10 @@ const char* _kOperatorTypeNames[] = { "HASHTABLEDUMMY", "PTF", "MUX", - "DEMUX" + "DEMUX", + "MERGEJOIN" }; -const std::map _OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(21, _kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +const std::map _OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(22, _kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); int _kTaskTypeValues[] = { TaskType::MAP, diff --git ql/src/gen/thrift/gen-cpp/queryplan_types.h ql/src/gen/thrift/gen-cpp/queryplan_types.h index 634dd55..71b5c88 100644 --- ql/src/gen/thrift/gen-cpp/queryplan_types.h +++ ql/src/gen/thrift/gen-cpp/queryplan_types.h @@ -56,7 +56,8 @@ struct OperatorType { HASHTABLEDUMMY = 17, PTF = 18, MUX = 19, - DEMUX = 20 + DEMUX = 20, + MERGEJOIN = 21 }; }; diff --git ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java index aa094ee..b286e06 100644 --- ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java +++ ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java @@ -32,7 +32,8 @@ HASHTABLEDUMMY(17), PTF(18), MUX(19), - DEMUX(20); + DEMUX(20), + MERGEJOIN(21); private final int value; @@ -95,6 +96,8 @@ public static OperatorType findByValue(int value) { return MUX; case 20: return DEMUX; + case 21: + return MERGEJOIN; default: return null; } diff --git ql/src/gen/thrift/gen-php/Types.php ql/src/gen/thrift/gen-php/Types.php index 5164b2c..f97ebf8 100644 --- ql/src/gen/thrift/gen-php/Types.php +++ ql/src/gen/thrift/gen-php/Types.php @@ -56,6 +56,7 @@ final class OperatorType { const PTF = 18; const MUX = 19; const DEMUX = 20; + const MERGEJOIN = 21; static public $__names = array( 0 => 'JOIN', 1 => 'MAPJOIN', @@ -78,6 +79,7 @@ final class OperatorType { 18 => 'PTF', 19 => 'MUX', 20 => 'DEMUX', + 21 => 'MERGEJOIN', ); } diff --git ql/src/gen/thrift/gen-py/queryplan/ttypes.py ql/src/gen/thrift/gen-py/queryplan/ttypes.py index 2a3f745..6c8970c 100644 --- ql/src/gen/thrift/gen-py/queryplan/ttypes.py +++ ql/src/gen/thrift/gen-py/queryplan/ttypes.py @@ -66,6 +66,7 @@ class OperatorType: PTF = 18 MUX = 19 DEMUX = 20 + MERGEJOIN = 21 _VALUES_TO_NAMES = { 0: "JOIN", @@ -89,6 +90,7 @@ class OperatorType: 18: "PTF", 19: "MUX", 20: "DEMUX", + 21: "MERGEJOIN", } _NAMES_TO_VALUES = { @@ -113,6 +115,7 @@ class OperatorType: "PTF": 18, "MUX": 19, "DEMUX": 20, + "MERGEJOIN": 21, } class TaskType: diff --git ql/src/gen/thrift/gen-rb/queryplan_types.rb ql/src/gen/thrift/gen-rb/queryplan_types.rb index 35e1f0f..82ae7a5 100644 --- ql/src/gen/thrift/gen-rb/queryplan_types.rb +++ ql/src/gen/thrift/gen-rb/queryplan_types.rb @@ -42,8 +42,9 @@ module OperatorType PTF = 18 MUX = 19 DEMUX = 20 - VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX"} - VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX, DEMUX]).freeze + MERGEJOIN = 21 + VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX", 21 => "MERGEJOIN"} + VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX, DEMUX, MERGEJOIN]).freeze end module TaskType diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java index 8c1067e..a0f91e0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java @@ -52,6 +52,7 @@ public AbstractMapJoinOperator() { + super(); } public AbstractMapJoinOperator(AbstractMapJoinOperator mjop) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java index 03194a4..04cc020 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java @@ -124,6 +124,7 @@ protected static final int NOTSKIPBIGTABLE = -1; public CommonJoinOperator() { + super(); } public CommonJoinOperator(CommonJoinOperator clone) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java new file mode 100644 index 0000000..123c939 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; + +public class CommonMergeJoinOperator extends AbstractMapJoinOperator implements + Serializable { + + private static final long serialVersionUID = 1L; + JoinOperator joinOp; + private boolean isBigTableWork; + private static final Log LOG = LogFactory.getLog(CommonMergeJoinOperator.class.getName()); + private Map aliasToInputNameMap; + Map> aliasToKeyMap = new HashMap>(); + + public CommonMergeJoinOperator() { + this.joinOp = null; + } + + public CommonMergeJoinOperator(JoinOperator joinOp) { + super(); + this.joinOp = joinOp; + } + + @Override + public void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hive.ql.exec.Operator#processOp(java.lang.Object, + * int) this processor has a push-pull model. First call to this method is a + * push but the rest is pulled until we run out of records. + */ + @Override + public void processOp(Object row, int tag) throws HiveException { + LOG.info("We are in the processing phase. Tag is " + tag); + LOG.info("Is custom merge? " + getConf().getCustomMerge() + " Big table position? " + + getConf().getBigTablePosition() + " Number of buckets? " + getConf().getNumBuckets()); + // HACK FIXME need to use the algorithm below + if (this.getParentOperators() == null) { + joinOp.setChildOperators(this.getChildOperators()); + joinOp.processOp(row, tag); + } + /* + * this is the algorithm for this join. Once we get a row, we get all rows + * corresponding to the key from the other table(s). Once we have all the + * matching rows, we continue to fetch the rows from the current table until + * we run out of keys that match the current one. Then we call + * checkAndGenObject to produce and forward the joined rows. + */ + List key = mergeJoinComputeKeys(row, alias); + LOG.info("alias is " + alias + " row is null? " + (row == null) + + " this storage for alias is null? " + (storage == null)); + List value = getFilteredValue(alias, row); + this.storage[alias].addRow(value); + + try { + for (byte i = 0; i < this.getParentOperators().size(); i++) { + if (tag == conf.getBigTablePosition()) { + continue; + } + + int keyCompare = compareKeys(key, aliasToKeyMap.get(i)); + if (keyCompare < 0) { + return; + } else if (keyCompare > 0) { + this.storage[i].clearRows(); + } + + // fetch the rows from the other tables having same key as the current + // row being processed. + Object rowOfSide = getParentOperators().get(i).getNextRow(aliasToInputNameMap.get(i)); + while (rowOfSide != null) { + List values = getFilteredValue(i, rowOfSide); + List otherKey = mergeJoinComputeKeys(rowOfSide, i); + int comparedKey = compareKeys(key, otherKey); + if (comparedKey < 0) { + this.storage[alias].clearRows(); + aliasToKeyMap.put(i, otherKey); + this.storage[i].addRow(values); + return; + } else if (comparedKey > 0) { + continue; + } + this.storage[i].addRow(values); + rowOfSide = getParentOperators().get(i).getNextRow(aliasToInputNameMap.get(i)); + } + } + } catch (Exception e) { + throw new HiveException(e); + } + checkAndGenObject(); + } + + private int compareKeys(List k1, List k2) { + int ret = 0; + + // join keys have difference sizes? + ret = k1.size() - k2.size(); + if (ret != 0) { + return ret; + } + + for (int i = 0; i < k1.size(); i++) { + WritableComparable key_1 = (WritableComparable) k1.get(i); + WritableComparable key_2 = (WritableComparable) k2.get(i); + if (key_1 == null && key_2 == null) { + return nullsafes != null && nullsafes[i] ? 0 : -1; // just return k1 is + // smaller than k2 + } else if (key_1 == null) { + return -1; + } else if (key_2 == null) { + return 1; + } + ret = WritableComparator.get(key_1.getClass()).compare(key_1, key_2); + if (ret != 0) { + return ret; + } + } + return ret; + } + + private List mergeJoinComputeKeys(Object row, Byte alias) throws HiveException { + return null; + } + + @Override + public String getName() { + return getOperatorName(); + } + + static public String getOperatorName() { + return "MERGEJOIN"; + } + + @Override + public OperatorType getType() { + return OperatorType.MERGEJOIN; + } + + @Override + public void initializeLocalWork(Configuration hconf) throws HiveException { + return; + } + + public void setJoinOp(JoinOperator joinOp) { + this.joinOp = joinOp; + } + + public boolean isBigTableWork() { + return isBigTableWork; + } + + public void setIsBigTableWork(boolean bigTableWork) { + this.isBigTableWork = bigTableWork; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index d5de58e..05a0f52 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -33,8 +33,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor; +import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -180,7 +181,7 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, PartitionDesc pd = ctx.partDesc; TableDesc td = pd.getTableDesc(); - + MapOpCtx opCtx = new MapOpCtx(); // Use table properties in case of unpartitioned tables, // and the union of table properties and partition properties, with partition @@ -204,42 +205,42 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, opCtx.partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter( partRawRowObjectInspector, opCtx.tblRawRowObjectInspector); - + // Next check if this table has partitions and if so // get the list of partition names as well as allocate // the serdes for the partition columns String pcols = overlayedProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); - + if (pcols != null && pcols.length() > 0) { String[] partKeys = pcols.trim().split("/"); String pcolTypes = overlayedProps .getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES); String[] partKeyTypes = pcolTypes.trim().split(":"); - + if (partKeys.length > partKeyTypes.length) { throw new HiveException("Internal error : partKeys length, " +partKeys.length + " greater than partKeyTypes length, " + partKeyTypes.length); } - + List partNames = new ArrayList(partKeys.length); Object[] partValues = new Object[partKeys.length]; List partObjectInspectors = new ArrayList(partKeys.length); - + for (int i = 0; i < partKeys.length; i++) { String key = partKeys[i]; partNames.add(key); ObjectInspector oi = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector (TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i])); - + // Partitions do not exist for this table if (partSpec == null) { // for partitionless table, initialize partValue to null partValues[i] = null; } else { - partValues[i] = + partValues[i] = ObjectInspectorConverters. getConverter(PrimitiveObjectInspectorFactory. - javaStringObjectInspector, oi).convert(partSpec.get(key)); + javaStringObjectInspector, oi).convert(partSpec.get(key)); } partObjectInspectors.add(oi); } @@ -340,6 +341,56 @@ private boolean isPartitioned(PartitionDesc pd) { return pd.getPartSpec() != null && !pd.getPartSpec().isEmpty(); } + public void setChildrenSecondary(Configuration hconf) throws HiveException { + + List> children = + new ArrayList>(); + + Map convertedOI = getConvertedOI(hconf); + try { + for (Map.Entry> entry : conf.getPathToAliases().entrySet()) { + String onefile = entry.getKey(); + List aliases = entry.getValue(); + + Path onepath = new Path(onefile); + + PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile); + + for (String onealias : aliases) { + Operator op = conf.getAliasToWork().get(onealias); + MapInputPath inp = new MapInputPath(onefile, onealias, op, partDesc); + if (opCtxMap.containsKey(inp)) { + continue; + } + MapOpCtx opCtx = initObjectInspector(hconf, inp, convertedOI); + opCtxMap.put(inp, opCtx); + + op.setParentOperators(new ArrayList>()); + op.getParentOperators().add(this); + LOG.info("MapOPerator: operator is " + op); + // check for the operators who will process rows coming to this Map + // Operator + children.add(op); + childrenOpToOpCtxMap.put(op, opCtx); + LOG.info("dump " + op + " " + + opCtxMap.get(inp).rowObjectInspector.getTypeName()); + current = opCtx; // just need for TestOperators.testMapOperator + } + } + + if (children.size() == 0) { + // didn't find match for input file path in configuration! + // serious problem .. + throw new HiveException("Unable to set up children"); + } + + // we found all the operators that we are supposed to process. + setChildOperators(children); + } catch (Exception e) { + throw new HiveException(e); + } + } + public void setChildren(Configuration hconf) throws HiveException { Path fpath = IOContext.get().getInputPath(); @@ -366,7 +417,7 @@ public void setChildren(Configuration hconf) throws HiveException { for (String onealias : aliases) { Operator op = conf.getAliasToWork().get(onealias); if (LOG.isDebugEnabled()) { - LOG.debug("Adding alias " + onealias + " to work list for file " + LOG.info("Adding alias " + onealias + " to work list for file " + onefile); } MapInputPath inp = new MapInputPath(onefile, onealias, op, partDesc); @@ -625,4 +676,10 @@ public OperatorType getType() { return null; } + @Override + public Object getNextRow(String inputName) throws Exception { + // This map operator definitely belongs to merge work. + Object nextRow = MapRecordProcessor.getNextRow(inputName); + return nextRow; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index db94271..2d474d6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -146,6 +146,7 @@ public int getNumChild() { /** * Implements the getChildren function for the Node Interface. */ + @Override public ArrayList getChildren() { if (getChildOperators() == null) { @@ -450,6 +451,8 @@ public void passExecContext(ExecMapperContext execContext) { */ protected void initialize(Configuration hconf, ObjectInspector inputOI, int parentId) throws HiveException { + LOG.info("id is " + id); + LOG.info("name is " + getName()); LOG.info("Initializing child " + id + " " + getName()); // Double the size of the array if needed if (parentId >= inputObjInspectors.length) { @@ -851,6 +854,7 @@ public void logStats() { * * @return the name of the operator */ + @Override public String getName() { return getOperatorName(); } @@ -1061,7 +1065,7 @@ public boolean supportSkewJoinOptimization() { if (parents != null) { for (Operator parent : parents) { - parentClones.add((Operator)(parent.clone())); + parentClones.add((parent.clone())); } } @@ -1082,8 +1086,8 @@ public boolean supportSkewJoinOptimization() { public Operator cloneOp() throws CloneNotSupportedException { T descClone = (T) conf.clone(); Operator ret = - (Operator) OperatorFactory.getAndMakeChild( - descClone, getSchema()); + OperatorFactory.getAndMakeChild( + descClone, getSchema()); return ret; } @@ -1254,15 +1258,15 @@ public Statistics getStatistics() { } return null; } - + public OpTraits getOpTraits() { if (conf != null) { return conf.getOpTraits(); } - + return null; } - + public void setOpTraits(OpTraits metaInfo) { if (LOG.isDebugEnabled()) { LOG.debug("Setting traits ("+metaInfo+") on "+this); @@ -1299,7 +1303,16 @@ public static Operator createDummy() { private static class DummyOperator extends Operator { public DummyOperator() { super("dummy"); } + @Override public void processOp(Object row, int tag) { } + @Override public OperatorType getType() { return null; } } + + public Object getNextRow(String string) throws Exception { + // all operators except the MapOperator just fetch and process the row. + assert (getParentOperators().size() == 1); + Object row = getParentOperators().get(0).getNextRow(string); + return row; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java index 5d41fa1..1d9bccc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java @@ -28,11 +28,12 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorLimitOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.CollectDesc; +import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; import org.apache.hadoop.hive.ql.plan.DemuxDesc; import org.apache.hadoop.hive.ql.plan.DummyStoreDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -114,6 +115,8 @@ public OpTuple(Class descClass, Class> opClass) { DemuxOperator.class)); opvec.add(new OpTuple(MuxDesc.class, MuxOperator.class)); + opvec.add(new OpTuple(CommonMergeJoinDesc.class, + CommonMergeJoinOperator.class)); } public static ArrayList vectorOpvec; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 4450ad3..c8b22b2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -197,6 +197,7 @@ public static String HADOOP_LOCAL_FS = "file:///"; public static String MAP_PLAN_NAME = "map.xml"; public static String REDUCE_PLAN_NAME = "reduce.xml"; + public static String MERGE_PLAN_NAME = "merge.xml"; public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class"; public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class"; @@ -330,7 +331,7 @@ private static BaseWork getBaseWork(Configuration conf, String name) { } if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) { - LOG.debug("Loading plan from string: "+path.toUri().getPath()); + LOG.info("Loading plan from string: " + path.toUri().getPath()); String planString = conf.get(path.toUri().getPath()); if (planString == null) { LOG.info("Could not find plan string in conf"); @@ -343,7 +344,7 @@ private static BaseWork getBaseWork(Configuration conf, String name) { in = new FileInputStream(localPath.toUri().getPath()); } - if(MAP_PLAN_NAME.equals(name)){ + if (MAP_PLAN_NAME.equals(name)) { if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){ gWork = deserializePlan(in, MapWork.class, conf); } else if(RCFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { @@ -363,6 +364,8 @@ private static BaseWork getBaseWork(Configuration conf, String name) { throw new RuntimeException("unable to determine work from configuration ." + MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)) ; } + } else if (MERGE_PLAN_NAME.equals(name)) { + gWork = deserializePlan(in, MapWork.class, conf); } gWorkMap.put(path, gWork); } else { @@ -586,14 +589,19 @@ public static void setMapRedWork(Configuration conf, MapredWork w, Path hiveScra } public static Path setMapWork(Configuration conf, MapWork w, Path hiveScratchDir, boolean useCache) { - return setBaseWork(conf, w, hiveScratchDir, MAP_PLAN_NAME, useCache); + List baseWorkList = new ArrayList(); + baseWorkList.add(w); + return setBaseWork(conf, baseWorkList, hiveScratchDir, MAP_PLAN_NAME, useCache); } public static Path setReduceWork(Configuration conf, ReduceWork w, Path hiveScratchDir, boolean useCache) { - return setBaseWork(conf, w, hiveScratchDir, REDUCE_PLAN_NAME, useCache); + List baseWorkList = new ArrayList(); + baseWorkList.add(w); + return setBaseWork(conf, baseWorkList, hiveScratchDir, REDUCE_PLAN_NAME, useCache); } - private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratchDir, String name, boolean useCache) { + private static Path setBaseWork(Configuration conf, List workList, Path hiveScratchDir, + String name, boolean useCache) { try { setPlanPath(conf, hiveScratchDir); @@ -605,7 +613,9 @@ private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratch // add it to the conf ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); out = new DeflaterOutputStream(byteOut, new Deflater(Deflater.BEST_SPEED)); - serializePlan(w, out, conf); + for (BaseWork w : workList) { + serializePlan(w, out, conf); + } LOG.info("Setting plan: "+planPath.toUri().getPath()); conf.set(planPath.toUri().getPath(), Base64.encodeBase64String(byteOut.toByteArray())); @@ -613,7 +623,9 @@ private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratch // use the default file system of the conf FileSystem fs = planPath.getFileSystem(conf); out = fs.create(planPath); - serializePlan(w, out, conf); + for (BaseWork w : workList) { + serializePlan(w, out, conf); + } // Serialize the plan to the default hdfs instance // Except for hadoop local mode execution where we should be @@ -634,7 +646,9 @@ private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratch } // Cache the plan in this process - gWorkMap.put(planPath, w); + if (workList.size() == 1) { + gWorkMap.put(planPath, workList.get(0)); + } return planPath; } catch (Exception e) { @@ -3424,7 +3438,7 @@ public static boolean createDirsWithPermission(Configuration conf, Path mkdir, return createDirsWithPermission(conf, mkdir, fsPermission, recursive); } - private static void resetConfAndCloseFS (Configuration conf, boolean unsetUmask, + private static void resetConfAndCloseFS (Configuration conf, boolean unsetUmask, String origUmask, FileSystem fs) throws IOException { if (unsetUmask) { if (origUmask != null) { @@ -3499,4 +3513,13 @@ public static String getQualifiedPath(HiveConf conf, Path path) throws HiveExcep public static boolean isDefaultNameNode(HiveConf conf) { return !conf.getChangedProperties().containsKey(HiveConf.ConfVars.HADOOPFS.varname); } + + public static Path setMergeWork(JobConf conf, List workList, Path mrScratchDir, + boolean b) { + return setBaseWork(conf, workList, mrScratchDir, MERGE_PLAN_NAME, b); + } + + public static MapWork getMergeWork(JobConf jconf) { + return (MapWork) getBaseWork(jconf, MERGE_PLAN_NAME); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java index 222d723..a6290cb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -31,7 +30,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.plan.BaseWork.VertexType; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.serializer.SerializationFactory; import org.apache.hadoop.mapred.FileSplit; @@ -78,9 +79,11 @@ private List dataInformationEvents; private int numBuckets = -1; private Configuration conf = null; - private boolean rootVertexInitialized = false; private final SplitGrouper grouper = new SplitGrouper(); private int taskCount = 0; + private VertexType vertexType; + private String inputNameDecidingParallelism; + public CustomPartitionVertex() { } @@ -88,8 +91,19 @@ public CustomPartitionVertex() { @Override public void initialize(VertexManagerPluginContext context) { this.context = context; - ByteBuffer byteBuf = ByteBuffer.wrap(context.getUserPayload()); - this.numBuckets = byteBuf.getInt(); + DataInputBuffer dib = new DataInputBuffer(); + byte[] payload = context.getUserPayload(); + dib.reset(payload, payload.length); + CustomVertexConfiguration customConf = new CustomVertexConfiguration(); + try { + customConf.readFields(dib); + } catch (IOException e) { + throw new RuntimeException(e); + } + + this.numBuckets = customConf.getNumBuckets(); + this.vertexType = customConf.getVertexType(); + this.inputNameDecidingParallelism = customConf.getInputName(); } @Override @@ -116,12 +130,6 @@ public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) { public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List events) { - // Ideally, since there's only 1 Input expected at the moment - - // ensure this method is called only once. Tez will call it once per Root - // Input. - Preconditions.checkState(rootVertexInitialized == false); - LOG.info("Root vertex not initialized"); - rootVertexInitialized = true; try { // This is using the payload from the RootVertexInitializer corresponding // to InputName. Ideally it should be using it's own configuration class - @@ -131,6 +139,11 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr MRHelpers.parseMRInputPayload(inputDescriptor.getUserPayload()); this.conf = MRHelpers.createConfFromByteString(protoPayload.getConfigurationBytes()); + if ((vertexType == VertexType.MULTI_INPUT_INITIALIZED_EDGES) + || (vertexType == VertexType.MULTI_INPUT_UNINITIALIZED_EDGES)) { + //createRoutingTable(); + } + /* * Currently in tez, the flow of events is thus: * "Generate Splits -> Initialize Vertex" (with parallelism info obtained @@ -165,9 +178,6 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr // No tasks should have been started yet. Checked by initial state // check. Preconditions.checkState(dataInformationEventSeen == false); - Preconditions - .checkState(context.getVertexNumTasks(context.getVertexName()) == -1, - "Parallelism for the vertex should be set to -1 if the InputInitializer is setting parallelism"); RootInputConfigureVertexTasksEvent cEvent = (RootInputConfigureVertexTasksEvent) event; // The vertex cannot be configured until all DataEvents are seen - to @@ -253,11 +263,13 @@ private void processAllEvents(String inputName, // Construct the EdgeManager descriptor to be used by all edges which need // the routing table. - EdgeManagerDescriptor hiveEdgeManagerDesc = - new EdgeManagerDescriptor(CustomPartitionEdge.class.getName()); - byte[] payload = getBytePayload(bucketToTaskMap); - hiveEdgeManagerDesc.setUserPayload(payload); - + EdgeManagerDescriptor hiveEdgeManagerDesc = null; + if ((vertexType == VertexType.MULTI_INPUT_INITIALIZED_EDGES) + || (vertexType == VertexType.INITIALIZED_EDGES)) { + hiveEdgeManagerDesc = new EdgeManagerDescriptor(CustomPartitionEdge.class.getName()); + byte[] payload = getBytePayload(bucketToTaskMap); + hiveEdgeManagerDesc.setUserPayload(payload); + } Map emMap = Maps.newHashMap(); // Replace the edge manager for all vertices which have routing type custom. @@ -290,10 +302,13 @@ private void processAllEvents(String inputName, rootInputSpecUpdate.put( inputName, RootInputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate()); - context.setVertexParallelism( - taskCount, - new VertexLocationHint(grouper.createTaskLocationHints(finalSplits - .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate); + if (inputNameDecidingParallelism.compareTo(inputName) == 0) { + LOG.info("ZZZ: input name deciding parallelism is " + inputName); + context.setVertexParallelism( + taskCount, + new VertexLocationHint(grouper.createTaskLocationHints(finalSplits + .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate); + } // Set the actual events for the tasks. context.addRootInputEvents(inputName, taskEvents); @@ -321,7 +336,8 @@ private FileSplit getFileSplitFromEvent(RootInputDataInformationEvent event) thr if (!(inputSplit instanceof FileSplit)) { throw new UnsupportedOperationException( - "Cannot handle splits other than FileSplit for the moment"); + "Cannot handle splits other than FileSplit for the moment. Current input split type: " + + inputSplit.getClass().getSimpleName()); } return (FileSplit) inputSplit; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java new file mode 100644 index 0000000..d923fa9 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java @@ -0,0 +1,50 @@ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hive.ql.plan.BaseWork.VertexType; +import org.apache.hadoop.io.Writable; + +public class CustomVertexConfiguration implements Writable { + + private int numBuckets; + private VertexType vertexType; + private String inputName; + + public CustomVertexConfiguration() { + } + + public CustomVertexConfiguration(int numBuckets, VertexType vertexType, String inputName) { + this.numBuckets = numBuckets; + this.vertexType = vertexType; + this.inputName = inputName; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(this.vertexType.ordinal()); + out.writeInt(this.numBuckets); + out.writeUTF(inputName); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.vertexType = VertexType.values()[in.readInt()]; + this.numBuckets = in.readInt(); + this.inputName = in.readUTF(); + } + + public int getNumBuckets() { + return numBuckets; + } + + public VertexType getVertexType() { + return vertexType; + } + + public String getInputName() { + return inputName; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index d964eb1..2bfce58 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -36,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; +import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger; import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -44,6 +47,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.StringUtils; import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.input.MultiMRInput; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; @@ -57,7 +61,9 @@ public class MapRecordProcessor extends RecordProcessor { + private static KeyValueReader mergeQueue; private MapOperator mapOp; + private MapOperator mergeMapOp; public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class); private final ExecMapperContext execContext = new ExecMapperContext(); private boolean abort = false; @@ -90,11 +96,13 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr ObjectCache cache = ObjectCacheFactory.getCache(jconf); try { + MapWork mergeMapWork = null; execContext.setJc(jconf); // create map and fetch operators mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY); if (mapWork == null) { mapWork = Utilities.getMapWork(jconf); + mergeMapWork = Utilities.getMergeWork(jconf); cache.cache(MAP_PLAN_KEY, mapWork); l4j.info("Plan: "+mapWork); for (String s: mapWork.getAliases()) { @@ -109,16 +117,33 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr mapOp = new MapOperator(); } + if (mergeMapWork != null) { + if (mergeMapWork.getVectorMode()) { + mergeMapOp = new VectorMapOperator(); + } else { + mergeMapOp = new MapOperator(); + } + } + // initialize map operator mapOp.setConf(mapWork); mapOp.setChildren(jconf); - l4j.info(mapOp.dump(0)); + if (mergeMapOp != null) { + mergeMapOp.setConf(mergeMapWork); + mergeMapOp.setChildrenSecondary(jconf); + } + // l4j.info(mapOp.dump(0)); MapredContext.init(true, new JobConf(jconf)); ((TezContext)MapredContext.get()).setInputs(inputs); mapOp.setExecContext(execContext); mapOp.initializeLocalWork(jconf); mapOp.initialize(jconf, null); + if (mergeMapOp != null) { + mergeMapOp.setExecContext(execContext); + mergeMapOp.initializeLocalWork(jconf); + mergeMapOp.initialize(jconf, null); + } // Initialization isn't finished until all parents of all operators // are initialized. For broadcast joins that means initializing the @@ -149,7 +174,7 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr } @Override - void run() throws IOException{ + void run() throws IOException, InterruptedException { MRInputLegacy in = TezProcessor.getMRInput(inputs); KeyValueReader reader = in.getReader(); @@ -239,4 +264,20 @@ void close(){ MapredContext.close(); } } + + public static Object getNextRow(String inputName) throws Exception { + // get the next row from the priority queue corresponding to this input name + if (mergeQueue == null) { + MultiMRInput multiMRInput = TezProcessor.getInput(inputName); + Collection kvReaders = multiMRInput.getKeyValueReaders(); + List kvReaderList = new ArrayList(kvReaders); + mergeQueue = new KeyValueInputMerger(kvReaderList); + } + + if (mergeQueue.next()) { + return mergeQueue.getCurrentValue(); + } else { + return null; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index e884afd..ba8923e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -38,7 +37,7 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; -import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger; +import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValuesInputMerger; import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -59,7 +58,6 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.tez.mapreduce.processor.MRTaskReporter; @@ -120,6 +118,10 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr ObjectCache cache = ObjectCacheFactory.getCache(jconf); + for (Entry entry : inputs.entrySet()) { + l4j.info("REDUCER name : " + entry.getKey() + " Logical input type: " + entry.getValue()); + } + rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector keyObjectInspector; @@ -140,7 +142,7 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr try { keyTableDesc = redWork.getKeyDesc(); - inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc + inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc .getDeserializerClass(), null); SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null); keyObjectInspector = inputKeyDeserializer.getObjectInspector(); @@ -152,7 +154,7 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr keyStructInspector = (StructObjectInspector)keyObjectInspector; batches = new VectorizedRowBatch[maxTags]; valueStructInspectors = new StructObjectInspector[maxTags]; - valueStringWriters = (List[])new List[maxTags]; + valueStringWriters = new List[maxTags]; keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size(); buffer = new DataOutputBuffer(); } @@ -279,7 +281,7 @@ void run() throws Exception { kvsReader = (KeyValuesReader) shuffleInputs.get(0).getReader(); }else { //get a sort merged input - kvsReader = new InputMerger(shuffleInputs); + kvsReader = new KeyValuesInputMerger(shuffleInputs); } } catch (Exception e) { throw new IOException(e); @@ -293,7 +295,6 @@ void run() throws Exception { break; } } - } /** @@ -306,7 +307,7 @@ void run() throws Exception { Map tag2input = redWork.getTagToInput(); ArrayList shuffleInputs = new ArrayList(); for(String inpStr : tag2input.values()){ - shuffleInputs.add((LogicalInput)inputs.get(inpStr)); + shuffleInputs.add(inputs.get(inpStr)); } return shuffleInputs; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index 6839e34..112c902 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; import java.text.NumberFormat; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,6 +33,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.input.MultiMRInput; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.LogicalIOProcessor; @@ -59,6 +61,7 @@ private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); private TezProcessorContext processorContext; + private static Map multiMRInputMap; protected static final NumberFormat taskIdFormat = NumberFormat.getInstance(); protected static final NumberFormat jobIdFormat = NumberFormat.getInstance(); @@ -95,6 +98,7 @@ public void initialize(TezProcessorContext processorContext) Configuration conf = TezUtils.createConfFromUserPayload(userPayload); this.jobConf = new JobConf(conf); setupMRLegacyConfigs(processorContext); + multiMRInputMap = new HashMap(); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR); } @@ -138,10 +142,12 @@ public void run(Map inputs, Map out if (isMap) { rproc = new MapRecordProcessor(); MRInputLegacy mrInput = getMRInput(inputs); - try { - mrInput.init(); - } catch (IOException e) { - throw new RuntimeException("Failed while initializing MRInput", e); + if (mrInput != null) { + try { + mrInput.init(); + } catch (IOException e) { + throw new RuntimeException("Failed while initializing MRInput", e); + } } } else { rproc = new ReduceRecordProcessor(); @@ -207,23 +213,38 @@ void initialize() throws Exception { this.writer = (KeyValueWriter) output.getWriter(); } + @Override public void collect(Object key, Object value) throws IOException { writer.write(key, value); } } - static MRInputLegacy getMRInput(Map inputs) { + static MRInputLegacy getMRInput(Map inputs) throws InterruptedException, + IOException { //there should be only one MRInput MRInputLegacy theMRInput = null; - for(LogicalInput inp : inputs.values()){ - if(inp instanceof MRInputLegacy){ + LOG.info("The input names are: " + Arrays.toString(inputs.keySet().toArray())); + for (Entry inp : inputs.entrySet()) { + if (inp.getValue() instanceof MRInputLegacy) { if(theMRInput != null){ throw new IllegalArgumentException("Only one MRInput is expected"); } //a better logic would be to find the alias - theMRInput = (MRInputLegacy)inp; + theMRInput = (MRInputLegacy) inp.getValue(); + } else if (inp.getValue() instanceof MultiMRInput) { + LOG.info("Found input type MultiMRInput"); + multiMRInputMap.put(inp.getKey(), (MultiMRInput) inp.getValue()); + // Iterator mrInputKeyValueIterator = + // multiMRInput.getKeyValueReaders().iterator(); + // while (mrInputKeyValueIterator.hasNext()) { + // KeyValueReader kvReader = mrInputKeyValueIterator.next(); + // } } } return theMRInput; } + + public static MultiMRInput getInput(String inputName) { + return multiMRInputMap.get(inputName); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java new file mode 100644 index 0000000..30cdd51 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez.tools; + +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor; +import org.apache.hadoop.io.BinaryComparable; +import org.apache.tez.runtime.library.api.KeyValueReader; + +/** + * A KeyValuesReader implementation that returns a sorted stream of key-values + * by doing a sorted merge of the key-value in LogicalInputs. + * Tags are in the last byte of the key, so no special handling for tags is required. + * Uses a priority queue to pick the KeyValuesReader of the input that is next in + * sort order. + */ +public class KeyValueInputMerger implements KeyValueReader { + + public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class); + private PriorityQueue pQueue = null; + private KeyValueReader nextKVReader = null; + + public KeyValueInputMerger(List multiMRInputs) throws Exception { + //get KeyValuesReaders from the LogicalInput and add them to priority queue + int initialCapacity = multiMRInputs.size(); + pQueue = new PriorityQueue(initialCapacity, new KVReaderComparator()); + for (KeyValueReader input : multiMRInputs) { + addToQueue(input); + } + } + + /** + * Add KeyValueReader to queue if it has more key-value + * + * @param kvReader + * @throws IOException + */ + private void addToQueue(KeyValueReader kvReader) throws IOException { + if (kvReader.next()) { + pQueue.add(kvReader); + } + } + + /** + * @return true if there are more key-values and advances to next key-values + * @throws IOException + */ + @Override + public boolean next() throws IOException { + //add the previous nextKVReader back to queue + if(nextKVReader != null){ + addToQueue(nextKVReader); + } + + //get the new nextKVReader with lowest key + nextKVReader = pQueue.poll(); + return nextKVReader != null; + } + + @Override + public Object getCurrentKey() throws IOException { + return nextKVReader.getCurrentKey(); + } + + @Override + public Object getCurrentValue() throws IOException { + return nextKVReader.getCurrentValue(); + } + + /** + * Comparator that compares KeyValuesReader on their current key + */ + class KVReaderComparator implements Comparator { + + @Override + public int compare(KeyValueReader kvReadr1, KeyValueReader kvReadr2) { + try { + BinaryComparable key1 = (BinaryComparable) kvReadr1.getCurrentValue(); + BinaryComparable key2 = (BinaryComparable) kvReadr2.getCurrentValue(); + return key1.compareTo(key2); + } catch (IOException e) { + l4j.error("Caught exception while reading shuffle input", e); + //die! + throw new RuntimeException(e); + } + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java new file mode 100644 index 0000000..f62bedb --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez.tools; + +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor; +import org.apache.hadoop.io.BinaryComparable; +import org.apache.tez.runtime.api.Input; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.library.api.KeyValuesReader; + +/** + * A KeyValuesReader implementation that returns a sorted stream of key-values + * by doing a sorted merge of the key-value in LogicalInputs. + * Tags are in the last byte of the key, so no special handling for tags is required. + * Uses a priority queue to pick the KeyValuesReader of the input that is next in + * sort order. + */ +public class KeyValuesInputMerger implements KeyValuesReader { + + public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class); + private PriorityQueue pQueue = null; + private KeyValuesReader nextKVReader = null; + + public KeyValuesInputMerger(List shuffleInputs) throws Exception { + //get KeyValuesReaders from the LogicalInput and add them to priority queue + int initialCapacity = shuffleInputs.size(); + pQueue = new PriorityQueue(initialCapacity, new KVReaderComparator()); + for(Input input : shuffleInputs){ + addToQueue((KeyValuesReader)input.getReader()); + } + } + + /** + * Add KeyValuesReader to queue if it has more key-values + * @param kvsReadr + * @throws IOException + */ + private void addToQueue(KeyValuesReader kvsReadr) throws IOException{ + if(kvsReadr.next()){ + pQueue.add(kvsReadr); + } + } + + /** + * @return true if there are more key-values and advances to next key-values + * @throws IOException + */ + public boolean next() throws IOException { + //add the previous nextKVReader back to queue + if(nextKVReader != null){ + addToQueue(nextKVReader); + } + + //get the new nextKVReader with lowest key + nextKVReader = pQueue.poll(); + return nextKVReader != null; + } + + public Object getCurrentKey() throws IOException { + return nextKVReader.getCurrentKey(); + } + + public Iterable getCurrentValues() throws IOException { + return nextKVReader.getCurrentValues(); + } + + /** + * Comparator that compares KeyValuesReader on their current key + */ + class KVReaderComparator implements Comparator { + + @Override + public int compare(KeyValuesReader kvReadr1, KeyValuesReader kvReadr2) { + try { + BinaryComparable key1 = (BinaryComparable) kvReadr1.getCurrentKey(); + BinaryComparable key2 = (BinaryComparable) kvReadr2.getCurrentKey(); + return key1.compareTo(key2); + } catch (IOException e) { + l4j.error("Caught exception while reading shuffle input", e); + //die! + throw new RuntimeException(e); + } + } + } + + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java index cc4477f..f036c6c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java @@ -34,7 +34,7 @@ @Override public Reader getReader() throws Exception { - return new InputMerger(getInputs()); + return new KeyValuesInputMerger(getInputs()); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index 6d9739f..d9610bb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hive.ql.plan; -import java.util.LinkedList; import java.util.LinkedHashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -36,6 +36,13 @@ @SuppressWarnings({"serial", "deprecation"}) public abstract class BaseWork extends AbstractOperatorDesc { + public enum VertexType { + AUTO_INITIALIZED_EDGES, // no custom vertex or edge + INITIALIZED_EDGES, // custom vertex and custom edge but single MR Input + MULTI_INPUT_INITIALIZED_EDGES, // custom vertex, custom edge and multi MR Input + MULTI_INPUT_UNINITIALIZED_EDGES // custom vertex, no custom edge, multi MR Input + } + // dummyOps is a reference to all the HashTableDummy operators in the // plan. These have to be separately initialized when we setup a task. // Their function is mainly as root ops to give the mapjoin the correct @@ -95,7 +102,7 @@ public void addDummyOp(HashTableDummyOperator dummyOp) { // add all children opStack.addAll(opSet); - + while(!opStack.empty()) { Operator op = opStack.pop(); returnSet.add(op); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java new file mode 100644 index 0000000..8b1159d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.Map; + +@Explain(displayName = "Common Merge Join Operator") +public class CommonMergeJoinDesc extends MapJoinDesc implements Serializable { + private static final long serialVersionUID = 1L; + private int numBuckets; + private boolean isSubQuery; + private int mapJoinConversionPos; + private JoinDesc bigTableJoinDesc; + + CommonMergeJoinDesc() { + } + + public CommonMergeJoinDesc(int numBuckets, boolean isSubQuery, int mapJoinConversionPos, + MapJoinDesc joinDesc) { + super(joinDesc); + this.numBuckets = numBuckets; + this.isSubQuery = isSubQuery; + this.mapJoinConversionPos = mapJoinConversionPos; + } + + public boolean getCustomMerge() { + return isSubQuery; + } + + public int getNumBuckets() { + return numBuckets; + } + + public int getBigTablePosition() { + return mapJoinConversionPos; + } + + public Map getTagToAlias() { + // FIXME this is supposed to be populated in the planning phase. Has a + // parent index to input name mapping. + return null; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 1d96c5d..a2e40db 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -26,9 +26,9 @@ import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; -import java.util.Set; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -120,6 +120,10 @@ private Map> scratchColumnMap = null; private boolean vectorMode = false; + private boolean doSplitsGrouping = true; + + private VertexType vertexType = VertexType.AUTO_INITIALIZED_EDGES; + public MapWork() {} public MapWork(String name) { @@ -555,4 +559,45 @@ public void logPathToAliases() { } } } + + public void setDoSplitsGrouping(boolean doSplitsGrouping) { + this.doSplitsGrouping = doSplitsGrouping; + } + + public boolean getDoSplitsGrouping() { + return this.doSplitsGrouping; + } + + public void setVertexType(VertexType incomingVertexType) { + switch (this.vertexType) { + case INITIALIZED_EDGES: + if (incomingVertexType == VertexType.MULTI_INPUT_UNINITIALIZED_EDGES) { + vertexType = VertexType.MULTI_INPUT_INITIALIZED_EDGES; + } + break; + + case MULTI_INPUT_INITIALIZED_EDGES: + // nothing to do + break; + + case MULTI_INPUT_UNINITIALIZED_EDGES: + if (incomingVertexType == VertexType.INITIALIZED_EDGES) { + vertexType = VertexType.MULTI_INPUT_INITIALIZED_EDGES; + } + break; + + case AUTO_INITIALIZED_EDGES: + vertexType = incomingVertexType; + break; + + default: + break; + } + this.vertexType = vertexType; + } + + public VertexType getVertexType() { + // TODO Auto-generated method stub + return this.vertexType; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java new file mode 100644 index 0000000..59dd752 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java @@ -0,0 +1,90 @@ +package org.apache.hadoop.hive.ql.plan; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.mapred.JobConf; + +public class MergeJoinWork extends BaseWork { + + private final List mergeJoinOpList = + new ArrayList(); + private final List mergeWorkList = new ArrayList(); + private int bigTableInputId; + private boolean isBigTableWork; + private BaseWork bigTableWork; + + public MergeJoinWork() { + super(); + } + + public MergeJoinWork(BaseWork work) { + super(work.getName()); + } + + @Explain(displayName = "Vertex") + @Override + public String getName() { + return super.getName(); + } + + @Override + public void replaceRoots(Map, Operator> replacementMap) { + } + + @Override + public Set> getAllRootOperators() { + return new HashSet>(); + } + + @Override + public void configureJobConf(JobConf job) { + } + + public List getMergeJoinOperator() { + return this.mergeJoinOpList; + } + + private void addMergeJoinOperator(CommonMergeJoinOperator mergeJoinOp) { + this.mergeJoinOpList.add(mergeJoinOp); + this.bigTableInputId = mergeJoinOp.getConf().getBigTablePosition(); + } + + public void + addMergedWork(BaseWork work, BaseWork connectWork, CommonMergeJoinOperator mergeJoinOp) { + addMergeJoinOperator(mergeJoinOp); + if (work != null) { + if ((bigTableWork != null) && (bigTableWork != work)) { + assert false; + } + this.bigTableWork = work; + setName(work.getName()); + if (work instanceof MapWork) { + MapWork mapWork = (MapWork) work; + mapWork.setVertexType(VertexType.MULTI_INPUT_UNINITIALIZED_EDGES); + mapWork.setDoSplitsGrouping(false); + } + } + + if (connectWork != null) { + this.mergeWorkList.add(connectWork); + } + } + + public List getBaseWorkList() { + return mergeWorkList; + } + + public String getBigTableAlias() { + return ((MapWork) bigTableWork).getAliasToWork().keySet().iterator().next(); + } + + public BaseWork getMainWork() { + return bigTableWork; + } +} diff --git ql/src/test/queries/clientpositive/tez_smb_1.q ql/src/test/queries/clientpositive/tez_smb_1.q new file mode 100644 index 0000000..a44cee1 --- /dev/null +++ ql/src/test/queries/clientpositive/tez_smb_1.q @@ -0,0 +1,38 @@ +explain +select a.key, a.value from src a join src b where a.key = b.key; + +set hive.auto.convert.join=true; +set hive.auto.convert.join.noconditionaltask=true; +set hive.auto.convert.join.noconditionaltask.size=10000; + +CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +CREATE TABLE tab_part (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; +CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; + +load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08'); + +load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); + +set hive.enforce.bucketing=true; +set hive.enforce.sorting = true; +set hive.optimize.bucketingsorting=false; +insert overwrite table tab_part partition (ds='2008-04-08') +select key,value from srcbucket_mapjoin_part; + +CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +insert overwrite table tab partition (ds='2008-04-08') +select key,value from srcbucket_mapjoin; + +set hive.convert.join.bucket.mapjoin.tez = false; +set hive.auto.convert.sortmerge.join = true; + +explain +select a.key, a.value, b.value +from tab a join tab_part b on a.key = b.key; + +select a.key, a.value, b.value +from tab a join tab_part b on a.key = b.key;