diff --git eclipse-templates/.classpath eclipse-templates/.classpath index 87ca207..d131007 100644 --- eclipse-templates/.classpath +++ eclipse-templates/.classpath @@ -50,6 +50,7 @@ + @@ -93,15 +94,15 @@ - - - + + + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 8633321..53e6b28 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -68,6 +68,20 @@ public MapJoinOperator(AbstractMapJoinOperator mjop) { super(mjop); } + /* + * We need the base (operator.java) implementation of start/endGroup. + * The parent class has functionality in those that map join can't use. + */ + @Override + public void endGroup() throws HiveException { + defaultEndGroup(); + } + + @Override + public void startGroup() throws HiveException { + defaultStartGroup(); + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); @@ -126,7 +140,8 @@ public void generateMapMetaData() throws HiveException, SerDeException { private void loadHashTable() throws HiveException { - if (!this.getExecContext().getLocalWork().getInputFileChangeSensitive()) { + if (this.getExecContext().getLocalWork() == null + || !this.getExecContext().getLocalWork().getInputFileChangeSensitive()) { if (hashTblInitedOnce) { return; } else { @@ -159,8 +174,8 @@ public void cleanUpInputFileChangedOp() throws HiveException { public void processOp(Object row, int tag) throws HiveException { try { if (firstRow) { - // generate the map metadata generateMapMetaData(); + loadHashTable(); firstRow = false; } alias = (byte)tag; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 8e48a82..0ff4954 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -515,8 +515,7 @@ public void process(Object row, int tag) throws HiveException { } } - // If a operator wants to do some work at the beginning of a group - public void startGroup() throws HiveException { + protected final void defaultStartGroup() throws HiveException { LOG.debug("Starting group"); if (childOperators == null) { @@ -535,8 +534,7 @@ public void startGroup() throws HiveException { LOG.debug("Start group Done"); } - // If an operator wants to do some work at the end of a group - public void endGroup() throws HiveException { + protected final void defaultEndGroup() throws HiveException { LOG.debug("Ending group"); if (childOperators == null) { @@ -555,6 +553,16 @@ public void endGroup() throws HiveException { LOG.debug("End group Done"); } + // If a operator wants to do some work at the beginning of a group + public void startGroup() throws HiveException { + defaultStartGroup(); + } + + // If an operator wants to do some work at the end of a group + public void endGroup() throws HiveException { + defaultEndGroup(); + } + // an blocking operator (e.g. GroupByOperator and JoinOperator) can // override this method to forward its outputs public void flush() throws HiveException { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java index b2c5ddc..2ac0928 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java @@ -79,21 +79,27 @@ public boolean equals(Object obj) { return false; return true; } - @SuppressWarnings("unchecked") - public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container) + + public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container) throws IOException, SerDeException { - SerDe serde = context.getSerDe(); container.readFields(in); + read(context, container); + } + + @SuppressWarnings("unchecked") + public void read(MapJoinObjectSerDeContext context, Writable container) throws SerDeException { + SerDe serde = context.getSerDe(); List value = (List)ObjectInspectorUtils.copyToStandardObject(serde.deserialize(container), serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE); + if(value == null) { key = EMPTY_OBJECT_ARRAY; } else { key = value.toArray(); } } - - public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out) + + public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out) throws IOException, SerDeException { SerDe serde = context.getSerDe(); ObjectInspector objectInspector = context.getStandardOI(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java index cab3db3..b7f66fe 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java @@ -104,30 +104,34 @@ public MapJoinRowContainer copy() { } return result; } - - @SuppressWarnings({"unchecked"}) - public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container) + + public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container) throws IOException, SerDeException { clear(); - SerDe serde = context.getSerDe(); long numRows = in.readLong(); for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) { - container.readFields(in); - List value = (List)ObjectInspectorUtils.copyToStandardObject(serde.deserialize(container), - serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE); - if(value == null) { - add(toList(EMPTY_OBJECT_ARRAY)); - } else { - Object[] valuesArray = value.toArray(); - if (context.hasFilterTag()) { - aliasFilter &= ((ShortWritable)valuesArray[valuesArray.length - 1]).get(); - } - add(toList(valuesArray)); + container.readFields(in); + read(context, container); + } + } + + @SuppressWarnings("unchecked") + public void read(MapJoinObjectSerDeContext context, Writable currentValue) throws SerDeException { + SerDe serde = context.getSerDe(); + List value = (List)ObjectInspectorUtils.copyToStandardObject(serde.deserialize(currentValue), + serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE); + if(value == null) { + add(toList(EMPTY_OBJECT_ARRAY)); + } else { + Object[] valuesArray = value.toArray(); + if (context.hasFilterTag()) { + aliasFilter &= ((ShortWritable)valuesArray[valuesArray.length - 1]).get(); } + add(toList(valuesArray)); } } - - public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out) + + public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out) throws IOException, SerDeException { SerDe serde = context.getSerDe(); ObjectInspector valueObjectInspector = context.getStandardOI(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java index 06151d5..83ba0f0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java @@ -40,6 +40,14 @@ public MapJoinTableContainerSerDe(MapJoinObjectSerDeContext keyContext, this.keyContext = keyContext; this.valueContext = valueContext; } + + public MapJoinObjectSerDeContext getKeyContext() { + return keyContext; + } + public MapJoinObjectSerDeContext getValueContext() { + return valueContext; + } + @SuppressWarnings({"unchecked"}) public MapJoinTableContainer load(ObjectInputStream in) throws HiveException { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index 2ec5561..8153112 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -17,18 +17,27 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import java.io.IOException; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.io.Writable; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.library.api.KeyValueReader; /** * HashTableLoader for Tez constructs the hashtable from records read from @@ -41,7 +50,6 @@ public HashTableLoader() { } - @SuppressWarnings("unused") @Override public void load(ExecMapperContext context, Configuration hconf, @@ -49,8 +57,43 @@ public void load(ExecMapperContext context, byte posBigTable, MapJoinTableContainer[] mapJoinTables, MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException { + TezContext tezContext = (TezContext) MapredContext.get(); Map parentToInput = desc.getParentToInput(); - } + int hashTableThreshold = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD); + float hashTableLoadFactor = HiveConf.getFloatVar(hconf, + HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR); + + for (int pos = 0; pos < mapJoinTables.length; pos++) { + if (pos == posBigTable) { + continue; + } + + LogicalInput input = tezContext.getInput(parentToInput.get(pos)); + try { + KeyValueReader kvReader = (KeyValueReader) input.getReader(); + + MapJoinTableContainer tableContainer = new HashMapWrapper(hashTableThreshold, + hashTableLoadFactor); + + // simply read all the kv pairs into the hashtable. + while (kvReader.next()) { + MapJoinKey key = new MapJoinKey(); + key.read(mapJoinTableSerdes[pos].getKeyContext(), (Writable)kvReader.getCurrentKey()); + MapJoinRowContainer values = new MapJoinRowContainer(); + values.read(mapJoinTableSerdes[pos].getValueContext(), (Writable)kvReader.getCurrentValue()); + tableContainer.put(key, values); + } + + mapJoinTables[pos] = tableContainer; + } catch (IOException e) { + throw new HiveException(e); + } catch (SerDeException e) { + throw new HiveException(e); + } catch (Exception e) { + throw new HiveException(e); + } + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 23400e4..f643c0f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -18,20 +18,24 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.ObjectCache; import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; +import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; @@ -53,6 +57,7 @@ private final ExecMapperContext execContext = new ExecMapperContext(); private boolean abort = false; protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; + private MapWork mapWork; @Override void init(JobConf jconf, MRTaskReporter mrReporter, Map inputs, @@ -73,15 +78,15 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in execContext.setJc(jconf); // create map and fetch operators - MapWork mrwork = (MapWork) cache.retrieve(MAP_PLAN_KEY); - if (mrwork == null) { - mrwork = Utilities.getMapWork(jconf); - cache.cache(MAP_PLAN_KEY, mrwork); + mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY); + if (mapWork == null) { + mapWork = Utilities.getMapWork(jconf); + cache.cache(MAP_PLAN_KEY, mapWork); } mapOp = new MapOperator(); // initialize map operator - mapOp.setConf(mrwork); + mapOp.setConf(mapWork); mapOp.setChildren(jconf); l4j.info(mapOp.dump(0)); @@ -91,6 +96,17 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in mapOp.initializeLocalWork(jconf); mapOp.initialize(jconf, null); + // Initialization isn't finished until all parents of all operators + // are initialized. For broadcast joins that means initializing the + // dummy parent operators as well. + List dummyOps = mapWork.getDummyOps(); + if (dummyOps != null) { + for (Operator dummyOp : dummyOps){ + dummyOp.setExecContext(execContext); + dummyOp.initialize(jconf, null); + } + } + mapOp.setOutputCollector(out); mapOp.setReporter(reporter); MapredContext.get().setReporter(reporter); @@ -124,10 +140,6 @@ private MRInput getMRInput(Map inputs) { @Override void run() throws IOException{ - if (inputs.size() != 1) { - throw new IllegalArgumentException("MapRecordProcessor expects single input" - + ", inputCount=" + inputs.size()); - } MRInput in = getMRInput(inputs); KeyValueReader reader = in.getReader(); @@ -186,6 +198,17 @@ void close(){ // detecting failed executions by exceptions thrown by the operator tree try { mapOp.close(abort); + + // Need to close the dummyOps as well. The operator pipeline + // is not considered "closed/done" unless all operators are + // done. For broadcast joins that includes the dummy parents. + List dummyOps = mapWork.getDummyOps(); + if (dummyOps != null) { + for (Operator dummyOp : dummyOps){ + dummyOp.close(abort); + } + } + if (isLogInfoEnabled) { logCloseInfo(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index eb39e5c..e0d838c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.ObjectCache; import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; @@ -33,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.Deserializer; @@ -77,6 +79,8 @@ private Object keyObject = null; private BytesWritable groupKey; + private ReduceWork redWork; + List row = new ArrayList(Utilities.reduceFieldNameList.size()); @Override @@ -90,7 +94,7 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector keyObjectInspector; - ReduceWork redWork = (ReduceWork) cache.retrieve(REDUCE_PLAN_KEY); + redWork = (ReduceWork) cache.retrieve(REDUCE_PLAN_KEY); if (redWork == null) { redWork = Utilities.getReduceWork(jconf); cache.cache(REDUCE_PLAN_KEY, redWork); @@ -134,6 +138,18 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in try { l4j.info(reducer.dump(0)); reducer.initialize(jconf, rowObjectInspector); + + // Initialization isn't finished until all parents of all operators + // are initialized. For broadcast joins that means initializing the + // dummy parent operators as well. + List dummyOps = redWork.getDummyOps(); + if (dummyOps != null) { + for (Operator dummyOp : dummyOps){ + dummyOp.setExecContext(execContext); + dummyOp.initialize(jconf, null); + } + } + } catch (Throwable e) { abort = true; if (e instanceof OutOfMemoryError) { @@ -153,10 +169,6 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in @Override void run() throws IOException{ - if (inputs.size() != 1) { - throw new IllegalArgumentException("ReduceRecordProcessor expects single input" - + ", inputCount=" + inputs.size()); - } //TODO - changes this for joins ShuffledMergedInput in = (ShuffledMergedInput)inputs.values().iterator().next(); @@ -299,6 +311,16 @@ void close(){ } reducer.close(abort); + + // Need to close the dummyOps as well. The operator pipeline + // is not considered "closed/done" unless all operators are + // done. For broadcast joins that includes the dummy parents. + List dummyOps = redWork.getDummyOps(); + if (dummyOps != null) { + for (Operator dummyOp : dummyOps){ + dummyOp.close(abort); + } + } reportStats rps = new reportStats(reporter); reducer.preorderMap(rps); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java index 6953365..2ab13b1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java @@ -527,6 +527,7 @@ protected MapJoinOperator convertJoinToBucketMapJoin( SortBucketJoinProcCtx joinContext, ParseContext parseContext) throws SemanticException { MapJoinOperator mapJoinOp = MapJoinProcessor.convertMapJoin( + parseContext.getConf(), parseContext.getOpParseCtx(), joinOp, pGraphContext.getJoinContext().get(joinOp), diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 5f4a024..0589898 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -95,7 +95,7 @@ public Object process(Node nd, Stack stack, long inputSize = currInputStat.getNumberOfBytes(); if ((bigInputStat == null) || - ((bigInputStat != null) && + ((bigInputStat != null) && (inputSize > bigInputStat.getNumberOfBytes()))) { if (bigTableFound) { @@ -141,7 +141,7 @@ public Object process(Node nd, Stack stack, } if (bigTablePosition == -1) { - // all tables have size 0. We let the suffle join handle this case. + // all tables have size 0. We let the shuffle join handle this case. return null; } @@ -161,7 +161,7 @@ public Object process(Node nd, Stack stack, // convert to a map join operator with this information ParseContext parseContext = context.parseContext; MapJoinOperator mapJoinOp = MapJoinProcessor. - convertJoinOpMapJoinOp(parseContext.getOpParseCtx(), + convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), joinOp, parseContext.getJoinContext().get(joinOp), bigTablePosition, true, false); Operator parentBigTableOp diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java index 0a08bec..24934ea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java @@ -237,13 +237,14 @@ private static String genMapJoinLocalWork(MapredWork newWork, MapJoinOperator ma * @return the alias to the big table * @throws SemanticException */ - public static String genMapJoinOpAndLocalWork(MapredWork newWork, JoinOperator op, int mapJoinPos) + public static String genMapJoinOpAndLocalWork(HiveConf conf, MapredWork newWork, + JoinOperator op, int mapJoinPos) throws SemanticException { LinkedHashMap, OpParseContext> opParseCtxMap = newWork.getMapWork().getOpParseCtxMap(); QBJoinTree newJoinTree = newWork.getMapWork().getJoinTree(); // generate the map join operator; already checked the map join - MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(opParseCtxMap, op, + MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(conf, opParseCtxMap, op, newJoinTree, mapJoinPos, true, false); return genLocalWorkForMapJoin(newWork, newMapJoinOp, mapJoinPos); } @@ -316,7 +317,7 @@ private static void validateMapJoinTypes(Operator op) * are cached in memory * @param noCheckOuterJoin */ - public static MapJoinOperator convertMapJoin( + public static MapJoinOperator convertMapJoin(HiveConf conf, LinkedHashMap, OpParseContext> opParseCtxMap, JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin, boolean validateMapJoinTree) @@ -374,7 +375,7 @@ public static MapJoinOperator convertMapJoin( } // create the map-join operator - MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(opParseCtxMap, + MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, opParseCtxMap, op, joinTree, mapJoinPos, noCheckOuterJoin, validateMapJoinTree); @@ -395,7 +396,7 @@ public static MapJoinOperator convertMapJoin( return mapJoinOp; } - public static MapJoinOperator convertJoinOpMapJoinOp( + public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf, LinkedHashMap, OpParseContext> opParseCtxMap, JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin, boolean validateMapJoinTree) @@ -433,9 +434,6 @@ public static MapJoinOperator convertJoinOpMapJoinOp( if (src != null) { Operator parentOp = op.getParentOperators().get(pos); assert parentOp.getParentOperators().size() == 1; - Operator grandParentOp = - parentOp.getParentOperators().get(0); - oldReduceSinkParentOps.add(parentOp); } pos++; @@ -536,8 +534,8 @@ public static MapJoinOperator convertJoinOpMapJoinOp( } List outputColumnNames = op.getConf().getOutputColumnNames(); - TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(PlanUtils - .getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX)); + TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf, + PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX)); JoinCondDesc[] joinCondns = op.getConf().getConds(); MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, valueTableDescs, valueFiltedTableDescs, outputColumnNames, mapJoinPos, joinCondns, @@ -589,14 +587,14 @@ public static MapJoinOperator convertJoinOpMapJoinOp( * are cached in memory * @param noCheckOuterJoin */ - public static MapJoinOperator convertSMBJoinToMapJoin( + public static MapJoinOperator convertSMBJoinToMapJoin(HiveConf hconf, Map, OpParseContext> opParseCtxMap, SMBMapJoinOperator smbJoinOp, QBJoinTree joinTree, int bigTablePos, boolean noCheckOuterJoin) throws SemanticException { // Create a new map join operator SMBJoinDesc smbJoinDesc = smbJoinOp.getConf(); List keyCols = smbJoinDesc.getKeys().get(Byte.valueOf((byte) 0)); - TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(PlanUtils + TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf, PlanUtils .getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX)); MapJoinDesc mapJoinDesc = new MapJoinDesc(smbJoinDesc.getKeys(), keyTableDesc, smbJoinDesc.getExprs(), @@ -644,8 +642,8 @@ public MapJoinOperator generateMapJoinOperator(ParseContext pctx, JoinOperator o LinkedHashMap, OpParseContext> opParseCtxMap = pctx .getOpParseCtx(); - MapJoinOperator mapJoinOp = convertMapJoin(opParseCtxMap, op, joinTree, mapJoinPos, - noCheckOuterJoin, true); + MapJoinOperator mapJoinOp = convertMapJoin(pctx.getConf(), opParseCtxMap, op, + joinTree, mapJoinPos, noCheckOuterJoin, true); // create a dummy select to select all columns genSelectPlan(pctx, mapJoinOp); return mapJoinOp; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index 9463b63..5f3a7ca 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -2,21 +2,29 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.GenTezProcContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType; @@ -51,6 +59,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, context.mapJoinParentMap.put(mapJoinOp, parents); } + BaseWork myWork = null; + while (childOp != null) { if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof FileSinkOperator)) { /* @@ -63,9 +73,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, * */ - BaseWork myWork = context.operatorWorkMap.get(childOp); + myWork = context.operatorWorkMap.get(childOp); BaseWork parentWork = context.operatorWorkMap.get(parentRS); - + // set the link between mapjoin and parent vertex int pos = context.mapJoinParentMap.get(mapJoinOp).indexOf(parentRS); if (pos == -1) { @@ -97,8 +107,56 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, } } + // create the dummy operators + List> dummyOperators = + new ArrayList>(); + + // create an new operator: HashTableDummyOperator, which share the table desc + HashTableDummyDesc desc = new HashTableDummyDesc(); + @SuppressWarnings("unchecked") + HashTableDummyOperator dummyOp = (HashTableDummyOperator) OperatorFactory.get(desc); + TableDesc tbl; + + // need to create the correct table descriptor for key/value + RowSchema rowSchema = parentRS.getParentOperators().get(0).getSchema(); + tbl = PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromRowSchema(rowSchema, "")); + dummyOp.getConf().setTbl(tbl); + + Map> keyExprMap = mapJoinOp.getConf().getKeys(); + List keyCols = keyExprMap.get(Byte.valueOf((byte) 0)); + StringBuffer keyOrder = new StringBuffer(); + for (ExprNodeDesc k: keyCols) { + keyOrder.append("+"); + } + TableDesc keyTableDesc = PlanUtils.getReduceKeyTableDesc(PlanUtils + .getFieldSchemasFromColumnList(keyCols, "mapjoinkey"), keyOrder.toString()); + mapJoinOp.getConf().setKeyTableDesc(keyTableDesc); + + // let the dummy op be the parent of mapjoin op + mapJoinOp.replaceParent(parentRS, dummyOp); + List> dummyChildren = + new ArrayList>(); + dummyChildren.add(mapJoinOp); + dummyOp.setChildOperators(dummyChildren); + dummyOperators.add(dummyOp); + // cut the operator tree so as to not retain connections from the parent RS downstream - parentRS.removeChild(mapJoinOp); + List> childOperators = parentRS.getChildOperators(); + int childIndex = childOperators.indexOf(mapJoinOp); + childOperators.remove(childIndex); + + // the "work" needs to know about the dummy operators. They have to be separately initialized + // at task startup + if (myWork != null) { + myWork.addDummyOp(dummyOp); + } else { + List> dummyList = dummyOperators; + if (context.linkChildOpWithDummyOp.containsKey(childOp)) { + dummyList = context.linkChildOpWithDummyOp.get(childOp); + } + dummyList.add(dummyOp); + context.linkChildOpWithDummyOp.put(childOp, dummyList); + } return true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java index 2efa7c2..490d48d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java @@ -189,7 +189,8 @@ private int getPosition(MapWork work, Operator joinOp, // optimize this newWork given the big table position String bigTableAlias = - MapJoinProcessor.genMapJoinOpAndLocalWork(newWork, newJoinOp, bigTablePosition); + MapJoinProcessor.genMapJoinOpAndLocalWork(physicalContext.getParseContext().getConf(), + newWork, newJoinOp, bigTablePosition); return new ObjectPair(newTask, bigTableAlias); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java index 698b67b..0b41db8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java @@ -432,7 +432,8 @@ private MapJoinOperator getMapJoinOperator(MapRedTask task, opParseContextMap.put(newSMBJoinOp, opParseContextMap.get(oldSMBJoinOp)); // generate the map join operator - return MapJoinProcessor.convertSMBJoinToMapJoin(opParseContextMap, newSMBJoinOp, + return MapJoinProcessor.convertSMBJoinToMapJoin(physicalContext.getConf(), + opParseContextMap, newSMBJoinOp, joinTree, mapJoinPos, true); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java index 088fe79..d78f39a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java @@ -94,6 +94,9 @@ // what position in the mapjoin the different parent work items will have. public final Map>> mapJoinParentMap; + // remember the dummy ops we created + public final Map, List>> linkChildOpWithDummyOp; + @SuppressWarnings("unchecked") public GenTezProcContext(HiveConf conf, ParseContext parseContext, List> moveTask, List> rootTasks, @@ -111,5 +114,6 @@ public GenTezProcContext(HiveConf conf, ParseContext parseContext, this.linkOpWithWorkMap = new HashMap, List>(); this.operatorWorkMap = new HashMap, BaseWork>(); this.mapJoinParentMap = new HashMap>>(); + this.linkChildOpWithDummyOp = new HashMap, List>>(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 9233a9f..41fb7d1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; @@ -118,7 +119,7 @@ public Object process(Node nd, Stack stack, GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink); // remember which parent belongs to which tag - reduceWork.getTagToInput().put(reduceSink.getConf().getTag(), + reduceWork.getTagToInput().put(reduceSink.getConf().getTag(), context.preceedingWork.getName()); tezWork.add(reduceWork); @@ -151,12 +152,12 @@ public Object process(Node nd, Stack stack, ReduceSinkOperator rs = (ReduceSinkOperator) operator; ReduceWork rWork = (ReduceWork) followingWork; GenMapRedUtils.setKeyAndValueDesc(rWork, rs); - + // remember which parent belongs to which tag rWork.getTagToInput().put(rs.getConf().getTag(), work.getName()); // add dependency between the two work items - tezWork.connect(work, context.leafOperatorToFollowingWork.get(operator), + tezWork.connect(work, context.leafOperatorToFollowingWork.get(operator), EdgeType.SIMPLE_EDGE); } @@ -200,6 +201,11 @@ public Object process(Node nd, Stack stack, context.operatorWorkMap.put(operator, work); List linkWorkList = context.linkOpWithWorkMap.get(operator); if (linkWorkList != null) { + if (context.linkChildOpWithDummyOp.containsKey(operator)) { + for (Operator dummy: context.linkChildOpWithDummyOp.get(operator)) { + work.addDummyOp((HashTableDummyOperator) dummy); + } + } for (BaseWork parentWork : linkWorkList) { tezWork.connect(parentWork, work, EdgeType.BROADCAST_EDGE); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index 8654da6..e8c3145 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -19,10 +19,10 @@ package org.apache.hadoop.hive.ql.plan; import java.util.ArrayList; -import java.util.HashMap; +import java.util.LinkedList; import java.util.List; -import java.util.Map; +import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.Operator; /** @@ -32,6 +32,12 @@ @SuppressWarnings({"serial", "deprecation"}) public abstract class BaseWork extends AbstractOperatorDesc { + // dummyOps is a reference to all the HashTableDummy operators in the + // plan. These have to be separately initialized when we setup a task. + // Their funtion is mainly as root ops to give the mapjoin the correct + // schema info. + List dummyOps; + public BaseWork() {} public BaseWork(String name) { @@ -58,6 +64,21 @@ public void setName(String name) { this.name = name; } + public List getDummyOps() { + return dummyOps; + } + + public void setDummyOps(List dummyOps) { + this.dummyOps = dummyOps; + } + + public void addDummyOp(HashTableDummyOperator dummyOp) { + if (dummyOps == null) { + dummyOps = new LinkedList(); + } + dummyOps.add(dummyOp); + } + protected abstract List> getAllRootOperators(); public List> getAllOperators() { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java index 6c354d3..2283bae 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java @@ -29,7 +29,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.exec.ColumnInfo; @@ -376,14 +378,34 @@ public static TableDesc getReduceKeyTableDesc(List fieldSchemas, /** * Generate the table descriptor for Map-side join key. */ - public static TableDesc getMapJoinKeyTableDesc(List fieldSchemas) { - return new TableDesc(SequenceFileInputFormat.class, - SequenceFileOutputFormat.class, Utilities.makeProperties("columns", - MetaStoreUtils.getColumnNamesFromFieldSchema(fieldSchemas), - "columns.types", MetaStoreUtils - .getColumnTypesFromFieldSchema(fieldSchemas), - serdeConstants.ESCAPE_CHAR, "\\", - serdeConstants.SERIALIZATION_LIB,LazyBinarySerDe.class.getName())); + public static TableDesc getMapJoinKeyTableDesc(Configuration conf, + List fieldSchemas) { + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_OPTIMIZE_TEZ)) { + // In tez we use a different way of transmitting the hash table. + // We basically use ReduceSinkOperators and set the transfer to + // be broadcast (instead of partitioned). As a consequence we use + // a different SerDe than in the MR mapjoin case. + StringBuffer order = new StringBuffer(); + for (FieldSchema f: fieldSchemas) { + order.append("+"); + } + return new TableDesc( + SequenceFileInputFormat.class, SequenceFileOutputFormat.class, + Utilities.makeProperties(serdeConstants.LIST_COLUMNS, MetaStoreUtils + .getColumnNamesFromFieldSchema(fieldSchemas), + serdeConstants.LIST_COLUMN_TYPES, MetaStoreUtils + .getColumnTypesFromFieldSchema(fieldSchemas), + serdeConstants.SERIALIZATION_SORT_ORDER, order.toString(), + serdeConstants.SERIALIZATION_LIB, BinarySortableSerDe.class.getName())); + } else { + return new TableDesc(SequenceFileInputFormat.class, + SequenceFileOutputFormat.class, Utilities.makeProperties("columns", + MetaStoreUtils.getColumnNamesFromFieldSchema(fieldSchemas), + "columns.types", MetaStoreUtils + .getColumnTypesFromFieldSchema(fieldSchemas), + serdeConstants.ESCAPE_CHAR, "\\", + serdeConstants.SERIALIZATION_LIB,LazyBinarySerDe.class.getName())); + } } /** @@ -391,13 +413,14 @@ public static TableDesc getMapJoinKeyTableDesc(List fieldSchemas) { */ public static TableDesc getMapJoinValueTableDesc( List fieldSchemas) { - return new TableDesc(SequenceFileInputFormat.class, - SequenceFileOutputFormat.class, Utilities.makeProperties("columns", - MetaStoreUtils.getColumnNamesFromFieldSchema(fieldSchemas), - "columns.types", MetaStoreUtils - .getColumnTypesFromFieldSchema(fieldSchemas), - serdeConstants.ESCAPE_CHAR, "\\", - serdeConstants.SERIALIZATION_LIB,LazyBinarySerDe.class.getName())); + return new TableDesc(SequenceFileInputFormat.class, + SequenceFileOutputFormat.class, Utilities.makeProperties( + serdeConstants.LIST_COLUMNS, MetaStoreUtils + .getColumnNamesFromFieldSchema(fieldSchemas), + serdeConstants.LIST_COLUMN_TYPES, MetaStoreUtils + .getColumnTypesFromFieldSchema(fieldSchemas), + serdeConstants.ESCAPE_CHAR, "\\", + serdeConstants.SERIALIZATION_LIB,LazyBinarySerDe.class.getName())); } /**