diff --git eclipse-templates/.classpath eclipse-templates/.classpath index 87ca207..d131007 100644 --- eclipse-templates/.classpath +++ eclipse-templates/.classpath @@ -50,6 +50,7 @@ + @@ -93,15 +94,15 @@ - - - + + + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 8633321..b077c39 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.SerDe; @@ -69,6 +70,46 @@ public MapJoinOperator(AbstractMapJoinOperator mjop) { } @Override + public void endGroup() throws HiveException { + LOG.debug("Ending group"); + + if (childOperators == null) { + return; + } + + if (fatalError) { + return; + } + + LOG.debug("Ending group for children:"); + for (Operator op : childOperators) { + op.endGroup(); + } + + LOG.debug("End group Done"); + } + + @Override + public void startGroup() throws HiveException { + LOG.debug("Starting group"); + + if (childOperators == null) { + return; + } + + if (fatalError) { + return; + } + + LOG.debug("Starting group for children:"); + for (Operator op : childOperators) { + op.startGroup(); + } + + LOG.debug("Start group Done"); + } + + @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); @@ -126,7 +167,8 @@ public void generateMapMetaData() throws HiveException, SerDeException { private void loadHashTable() throws HiveException { - if (!this.getExecContext().getLocalWork().getInputFileChangeSensitive()) { + if (this.getExecContext().getLocalWork() == null + || !this.getExecContext().getLocalWork().getInputFileChangeSensitive()) { if (hashTblInitedOnce) { return; } else { @@ -159,8 +201,8 @@ public void cleanUpInputFileChangedOp() throws HiveException { public void processOp(Object row, int tag) throws HiveException { try { if (firstRow) { - // generate the map metadata generateMapMetaData(); + loadHashTable(); firstRow = false; } alias = (byte)tag; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java index b2c5ddc..3700325 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java @@ -36,7 +36,7 @@ private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0]; private Object[] key; - + public MapJoinKey(Object[] key) { this.key = key; } @@ -57,8 +57,8 @@ public boolean hasAnyNulls(boolean[] nullsafes){ } return false; } - - + + @Override public int hashCode() { final int prime = 31; @@ -68,19 +68,24 @@ public int hashCode() { } @Override public boolean equals(Object obj) { - if (this == obj) + if (this == obj) { return true; - if (obj == null) + } + if (obj == null) { return false; - if (getClass() != obj.getClass()) + } + if (getClass() != obj.getClass()) { return false; + } MapJoinKey other = (MapJoinKey) obj; - if (!Arrays.equals(key, other.key)) + if (!Arrays.equals(key, other.key)) { return false; + } return true; } + @SuppressWarnings("unchecked") - public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container) + public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container) throws IOException, SerDeException { SerDe serde = context.getSerDe(); container.readFields(in); @@ -92,8 +97,21 @@ public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writab key = value.toArray(); } } - - public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out) + + @SuppressWarnings("unchecked") + public void read(MapJoinObjectSerDeContext context, Writable container) throws SerDeException { + SerDe serde = context.getSerDe(); + List value = (List)ObjectInspectorUtils.copyToStandardObject(serde.deserialize(container), + serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE); + + if(value == null) { + key = EMPTY_OBJECT_ARRAY; + } else { + key = value.toArray(); + } + } + + public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out) throws IOException, SerDeException { SerDe serde = context.getSerDe(); ObjectInspector objectInspector = context.getStandardOI(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java index cab3db3..cb9279b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java @@ -37,7 +37,7 @@ @SuppressWarnings("deprecation") public class MapJoinRowContainer extends AbstractRowContainer> { private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0]; - + private final List> list; private int index; private byte aliasFilter = (byte) 0xff; @@ -45,7 +45,7 @@ public MapJoinRowContainer() { index = 0; list = new ArrayList>(1); - } + } @Override public void add(List t) { @@ -92,11 +92,11 @@ public void clear() { list.clear(); index = 0; } - + public byte getAliasFilter() { return aliasFilter; } - + public MapJoinRowContainer copy() { MapJoinRowContainer result = new MapJoinRowContainer(); for(List item : list) { @@ -104,15 +104,15 @@ public MapJoinRowContainer copy() { } return result; } - + @SuppressWarnings({"unchecked"}) - public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container) + public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container) throws IOException, SerDeException { clear(); SerDe serde = context.getSerDe(); long numRows = in.readLong(); for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) { - container.readFields(in); + container.readFields(in); List value = (List)ObjectInspectorUtils.copyToStandardObject(serde.deserialize(container), serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE); if(value == null) { @@ -126,8 +126,23 @@ public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writab } } } - - public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out) + + public void read(MapJoinObjectSerDeContext context, Writable currentValue) throws SerDeException { + SerDe serde = context.getSerDe(); + List value = (List)ObjectInspectorUtils.copyToStandardObject(serde.deserialize(currentValue), + serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE); + if(value == null) { + add(toList(EMPTY_OBJECT_ARRAY)); + } else { + Object[] valuesArray = value.toArray(); + if (context.hasFilterTag()) { + aliasFilter &= ((ShortWritable)valuesArray[valuesArray.length - 1]).get(); + } + add(toList(valuesArray)); + } + } + + public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out) throws IOException, SerDeException { SerDe serde = context.getSerDe(); ObjectInspector valueObjectInspector = context.getStandardOI(); @@ -136,7 +151,7 @@ public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out) out.writeLong(numRows); for (List row = first(); row != null; row = next()) { serde.serialize(row.toArray(), valueObjectInspector).write(out); - ++numRowsWritten; + ++numRowsWritten; } if(numRows != size()) { throw new ConcurrentModificationException("Values was modifified while persisting"); @@ -145,7 +160,7 @@ public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out) throw new IllegalStateException("Expected to write " + numRows + " but wrote " + numRowsWritten); } } - + private List toList(Object[] array) { return new NoCopyingArrayList(array); } @@ -154,7 +169,7 @@ public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out) * so we don't care about copying in and out. */ private static class NoCopyingArrayList extends AbstractList { - private Object[] array; + private final Object[] array; public NoCopyingArrayList(Object[] array) { this.array = array; } @@ -167,9 +182,10 @@ public Object get(int index) { public int size() { return array.length; } - + + @Override public Object[] toArray() { return array; - } + } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java index 06151d5..d9f99e6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java @@ -32,16 +32,25 @@ @SuppressWarnings("deprecation") public class MapJoinTableContainerSerDe { - + private final MapJoinObjectSerDeContext keyContext; private final MapJoinObjectSerDeContext valueContext; + public MapJoinTableContainerSerDe(MapJoinObjectSerDeContext keyContext, MapJoinObjectSerDeContext valueContext) { this.keyContext = keyContext; this.valueContext = valueContext; } + + public MapJoinObjectSerDeContext getKeyContext() { + return keyContext; + } + public MapJoinObjectSerDeContext getValueContext() { + return valueContext; + } + @SuppressWarnings({"unchecked"}) - public MapJoinTableContainer load(ObjectInputStream in) + public MapJoinTableContainer load(ObjectInputStream in) throws HiveException { SerDe keySerDe = keyContext.getSerDe(); SerDe valueSerDe = valueContext.getSerDe(); @@ -49,7 +58,7 @@ public MapJoinTableContainer load(ObjectInputStream in) try { String name = in.readUTF(); Map metaData = (Map) in.readObject(); - tableContainer = create(name, metaData); + tableContainer = create(name, metaData); } catch (IOException e) { throw new HiveException("IO error while trying to create table container", e); } catch (ClassNotFoundException e) { @@ -57,7 +66,7 @@ public MapJoinTableContainer load(ObjectInputStream in) } try { Writable keyContainer = keySerDe.getSerializedClass().newInstance(); - Writable valueContainer = valueSerDe.getSerializedClass().newInstance(); + Writable valueContainer = valueSerDe.getSerializedClass().newInstance(); int numKeys = in.readInt(); for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) { MapJoinKey key = new MapJoinKey(); @@ -76,7 +85,7 @@ public MapJoinTableContainer load(ObjectInputStream in) public void persist(ObjectOutputStream out, MapJoinTableContainer tableContainer) throws HiveException { int numKeys = tableContainer.size(); - try { + try { out.writeUTF(tableContainer.getClass().getName()); out.writeObject(tableContainer.getMetaData()); out.writeInt(numKeys); @@ -95,14 +104,14 @@ public void persist(ObjectOutputStream out, MapJoinTableContainer tableContainer throw new ConcurrentModificationException("TableContainer was modified while persisting: " + tableContainer); } } - + public static void persistDummyTable(ObjectOutputStream out) throws IOException { MapJoinTableContainer tableContainer = new HashMapWrapper(); out.writeUTF(tableContainer.getClass().getName()); out.writeObject(tableContainer.getMetaData()); out.writeInt(tableContainer.size()); } - + private MapJoinTableContainer create(String name, Map metaData) throws HiveException { try { @SuppressWarnings("unchecked") diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index 2ec5561..6a21439 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import java.io.IOException; import java.util.Map; import org.apache.commons.logging.Log; @@ -25,10 +26,17 @@ import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.io.Writable; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.library.api.KeyValueReader; /** * HashTableLoader for Tez constructs the hashtable from records read from @@ -49,8 +57,40 @@ public void load(ExecMapperContext context, byte posBigTable, MapJoinTableContainer[] mapJoinTables, MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException { + TezContext tezContext = (TezContext) MapredContext.get(); Map parentToInput = desc.getParentToInput(); - } + for (int pos = 0; pos < mapJoinTables.length; pos++) { + if (pos == posBigTable) { + continue; + } + + LogicalInput input = tezContext.getInput(parentToInput.get(pos)); + + try { + System.out.println("XXX: Loading a logical input"); + + KeyValueReader kvReader = (KeyValueReader) input.getReader(); + + MapJoinTableContainer tableContainer = new HashMapWrapper(100000, 0.75f); + + while (kvReader.next()) { + MapJoinKey key = new MapJoinKey(); + key.read(mapJoinTableSerdes[pos].getKeyContext(), (Writable)kvReader.getCurrentKey()); + MapJoinRowContainer values = new MapJoinRowContainer(); + values.read(mapJoinTableSerdes[pos].getValueContext(), (Writable)kvReader.getCurrentValue()); + tableContainer.put(key, values); + } + + mapJoinTables[pos] = tableContainer; + } catch (IOException e) { + throw new HiveException(e); + } catch (SerDeException e) { + throw new HiveException(e); + } catch (Exception e) { + throw new HiveException(e); + } + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 23400e4..a33145e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -18,20 +18,24 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.ObjectCache; import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; +import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; @@ -53,6 +57,7 @@ private final ExecMapperContext execContext = new ExecMapperContext(); private boolean abort = false; protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; + private MapWork mapWork; @Override void init(JobConf jconf, MRTaskReporter mrReporter, Map inputs, @@ -73,15 +78,15 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in execContext.setJc(jconf); // create map and fetch operators - MapWork mrwork = (MapWork) cache.retrieve(MAP_PLAN_KEY); - if (mrwork == null) { - mrwork = Utilities.getMapWork(jconf); - cache.cache(MAP_PLAN_KEY, mrwork); + mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY); + if (mapWork == null) { + mapWork = Utilities.getMapWork(jconf); + cache.cache(MAP_PLAN_KEY, mapWork); } mapOp = new MapOperator(); // initialize map operator - mapOp.setConf(mrwork); + mapOp.setConf(mapWork); mapOp.setChildren(jconf); l4j.info(mapOp.dump(0)); @@ -91,6 +96,14 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in mapOp.initializeLocalWork(jconf); mapOp.initialize(jconf, null); + List dummyOps = mapWork.getDummyOps(); + if (dummyOps != null) { + for (Operator dummyOp : dummyOps){ + dummyOp.setExecContext(execContext); + dummyOp.initialize(jconf, null); + } + } + mapOp.setOutputCollector(out); mapOp.setReporter(reporter); MapredContext.get().setReporter(reporter); @@ -124,10 +137,6 @@ private MRInput getMRInput(Map inputs) { @Override void run() throws IOException{ - if (inputs.size() != 1) { - throw new IllegalArgumentException("MapRecordProcessor expects single input" - + ", inputCount=" + inputs.size()); - } MRInput in = getMRInput(inputs); KeyValueReader reader = in.getReader(); @@ -186,6 +195,13 @@ void close(){ // detecting failed executions by exceptions thrown by the operator tree try { mapOp.close(abort); + List dummyOps = mapWork.getDummyOps(); + if (dummyOps != null) { + for (Operator dummyOp : dummyOps){ + dummyOp.close(abort); + } + } + if (isLogInfoEnabled) { logCloseInfo(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index eb39e5c..e904f3b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.ObjectCache; import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; @@ -33,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.Deserializer; @@ -77,6 +79,8 @@ private Object keyObject = null; private BytesWritable groupKey; + private ReduceWork redWork; + List row = new ArrayList(Utilities.reduceFieldNameList.size()); @Override @@ -90,7 +94,7 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector keyObjectInspector; - ReduceWork redWork = (ReduceWork) cache.retrieve(REDUCE_PLAN_KEY); + redWork = (ReduceWork) cache.retrieve(REDUCE_PLAN_KEY); if (redWork == null) { redWork = Utilities.getReduceWork(jconf); cache.cache(REDUCE_PLAN_KEY, redWork); @@ -134,6 +138,15 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in try { l4j.info(reducer.dump(0)); reducer.initialize(jconf, rowObjectInspector); + + List dummyOps = redWork.getDummyOps(); + if (dummyOps != null) { + for (Operator dummyOp : dummyOps){ + dummyOp.setExecContext(execContext); + dummyOp.initialize(jconf, null); + } + } + } catch (Throwable e) { abort = true; if (e instanceof OutOfMemoryError) { @@ -153,10 +166,6 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in @Override void run() throws IOException{ - if (inputs.size() != 1) { - throw new IllegalArgumentException("ReduceRecordProcessor expects single input" - + ", inputCount=" + inputs.size()); - } //TODO - changes this for joins ShuffledMergedInput in = (ShuffledMergedInput)inputs.values().iterator().next(); @@ -299,6 +308,12 @@ void close(){ } reducer.close(abort); + List dummyOps = redWork.getDummyOps(); + if (dummyOps != null) { + for (Operator dummyOp : dummyOps){ + dummyOp.close(abort); + } + } reportStats rps = new reportStats(reporter); reducer.preorderMap(rps); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java index 6953365..2ab13b1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java @@ -527,6 +527,7 @@ protected MapJoinOperator convertJoinToBucketMapJoin( SortBucketJoinProcCtx joinContext, ParseContext parseContext) throws SemanticException { MapJoinOperator mapJoinOp = MapJoinProcessor.convertMapJoin( + parseContext.getConf(), parseContext.getOpParseCtx(), joinOp, pGraphContext.getJoinContext().get(joinOp), diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java index 58a9b59..8b3f9d0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java @@ -81,7 +81,8 @@ public ParseContext transform(ParseContext pactx) throws SemanticException { opToParseCtxMap = pGraphContext.getOpParseCtx(); // generate pruned column list for all relevant operators - ColumnPrunerProcCtx cppCtx = new ColumnPrunerProcCtx(opToParseCtxMap); + ColumnPrunerProcCtx cppCtx = new ColumnPrunerProcCtx(pGraphContext.getConf(), + opToParseCtxMap); // create a walker which walks the tree in a DFS manner while maintaining // the operator stack. The dispatcher diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java index db36151..7f8ff57 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.SelectOperator; @@ -40,14 +41,17 @@ */ public class ColumnPrunerProcCtx implements NodeProcessorCtx { + private final HiveConf conf; + private final Map, List> prunedColLists; private final HashMap, OpParseContext> opToParseCtxMap; private final Map>> joinPrunedColLists; - public ColumnPrunerProcCtx( + public ColumnPrunerProcCtx(HiveConf conf, HashMap, OpParseContext> opToParseContextMap) { + this.conf = conf; prunedColLists = new HashMap, List>(); opToParseCtxMap = opToParseContextMap; joinPrunedColLists = new HashMap>>(); @@ -57,6 +61,10 @@ public ColumnPrunerProcCtx( return joinPrunedColLists; } + public HiveConf getConf() { + return conf; + } + /** * @return the prunedColLists */ diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java index 0798470..8fe12fe 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java @@ -907,8 +907,8 @@ private static void pruneJoinOperator(NodeProcessorCtx ctx, keyOrder.append("+"); } - TableDesc valueTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils - .getFieldSchemasFromColumnList(valueCols, "mapjoinvalue")); + TableDesc valueTableDesc = PlanUtils.getMapJoinValueTableDesc(cppCtx.getConf(), + PlanUtils.getFieldSchemasFromColumnList(valueCols, "mapjoinvalue")); valueTableDescs.add(valueTableDesc); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 5f4a024..0589898 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -95,7 +95,7 @@ public Object process(Node nd, Stack stack, long inputSize = currInputStat.getNumberOfBytes(); if ((bigInputStat == null) || - ((bigInputStat != null) && + ((bigInputStat != null) && (inputSize > bigInputStat.getNumberOfBytes()))) { if (bigTableFound) { @@ -141,7 +141,7 @@ public Object process(Node nd, Stack stack, } if (bigTablePosition == -1) { - // all tables have size 0. We let the suffle join handle this case. + // all tables have size 0. We let the shuffle join handle this case. return null; } @@ -161,7 +161,7 @@ public Object process(Node nd, Stack stack, // convert to a map join operator with this information ParseContext parseContext = context.parseContext; MapJoinOperator mapJoinOp = MapJoinProcessor. - convertJoinOpMapJoinOp(parseContext.getOpParseCtx(), + convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), joinOp, parseContext.getJoinContext().get(joinOp), bigTablePosition, true, false); Operator parentBigTableOp diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java index 0a08bec..1a3dce2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java @@ -237,13 +237,14 @@ private static String genMapJoinLocalWork(MapredWork newWork, MapJoinOperator ma * @return the alias to the big table * @throws SemanticException */ - public static String genMapJoinOpAndLocalWork(MapredWork newWork, JoinOperator op, int mapJoinPos) + public static String genMapJoinOpAndLocalWork(HiveConf conf, + MapredWork newWork, JoinOperator op, int mapJoinPos) throws SemanticException { LinkedHashMap, OpParseContext> opParseCtxMap = newWork.getMapWork().getOpParseCtxMap(); QBJoinTree newJoinTree = newWork.getMapWork().getJoinTree(); // generate the map join operator; already checked the map join - MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(opParseCtxMap, op, + MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(conf, opParseCtxMap, op, newJoinTree, mapJoinPos, true, false); return genLocalWorkForMapJoin(newWork, newMapJoinOp, mapJoinPos); } @@ -316,7 +317,7 @@ private static void validateMapJoinTypes(Operator op) * are cached in memory * @param noCheckOuterJoin */ - public static MapJoinOperator convertMapJoin( + public static MapJoinOperator convertMapJoin(HiveConf conf, LinkedHashMap, OpParseContext> opParseCtxMap, JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin, boolean validateMapJoinTree) @@ -374,7 +375,7 @@ public static MapJoinOperator convertMapJoin( } // create the map-join operator - MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(opParseCtxMap, + MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, opParseCtxMap, op, joinTree, mapJoinPos, noCheckOuterJoin, validateMapJoinTree); @@ -395,7 +396,7 @@ public static MapJoinOperator convertMapJoin( return mapJoinOp; } - public static MapJoinOperator convertJoinOpMapJoinOp( + public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf, LinkedHashMap, OpParseContext> opParseCtxMap, JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin, boolean validateMapJoinTree) @@ -433,9 +434,6 @@ public static MapJoinOperator convertJoinOpMapJoinOp( if (src != null) { Operator parentOp = op.getParentOperators().get(pos); assert parentOp.getParentOperators().size() == 1; - Operator grandParentOp = - parentOp.getParentOperators().get(0); - oldReduceSinkParentOps.add(parentOp); } pos++; @@ -504,10 +502,10 @@ public static MapJoinOperator convertJoinOpMapJoinOp( keyOrder.append("+"); } - TableDesc valueTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils - .getFieldSchemasFromColumnList(valueCols, "mapjoinvalue")); - TableDesc valueFilteredTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils - .getFieldSchemasFromColumnList(valueFilteredCols, "mapjoinvalue")); + TableDesc valueTableDesc = PlanUtils.getMapJoinValueTableDesc(hconf, + PlanUtils.getFieldSchemasFromColumnList(valueCols, "mapjoinvalue")); + TableDesc valueFilteredTableDesc = PlanUtils.getMapJoinValueTableDesc(hconf, + PlanUtils.getFieldSchemasFromColumnList(valueFilteredCols, "mapjoinvalue")); valueTableDescs.add(valueTableDesc); valueFiltedTableDescs.add(valueFilteredTableDesc); @@ -536,8 +534,8 @@ public static MapJoinOperator convertJoinOpMapJoinOp( } List outputColumnNames = op.getConf().getOutputColumnNames(); - TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(PlanUtils - .getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX)); + TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf, + PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX)); JoinCondDesc[] joinCondns = op.getConf().getConds(); MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, valueTableDescs, valueFiltedTableDescs, outputColumnNames, mapJoinPos, joinCondns, @@ -589,14 +587,14 @@ public static MapJoinOperator convertJoinOpMapJoinOp( * are cached in memory * @param noCheckOuterJoin */ - public static MapJoinOperator convertSMBJoinToMapJoin( + public static MapJoinOperator convertSMBJoinToMapJoin(HiveConf hconf, Map, OpParseContext> opParseCtxMap, SMBMapJoinOperator smbJoinOp, QBJoinTree joinTree, int bigTablePos, boolean noCheckOuterJoin) throws SemanticException { // Create a new map join operator SMBJoinDesc smbJoinDesc = smbJoinOp.getConf(); List keyCols = smbJoinDesc.getKeys().get(Byte.valueOf((byte) 0)); - TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(PlanUtils + TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf, PlanUtils .getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX)); MapJoinDesc mapJoinDesc = new MapJoinDesc(smbJoinDesc.getKeys(), keyTableDesc, smbJoinDesc.getExprs(), @@ -644,8 +642,8 @@ public MapJoinOperator generateMapJoinOperator(ParseContext pctx, JoinOperator o LinkedHashMap, OpParseContext> opParseCtxMap = pctx .getOpParseCtx(); - MapJoinOperator mapJoinOp = convertMapJoin(opParseCtxMap, op, joinTree, mapJoinPos, - noCheckOuterJoin, true); + MapJoinOperator mapJoinOp = convertMapJoin(pctx.getConf(), opParseCtxMap, op, + joinTree, mapJoinPos, noCheckOuterJoin, true); // create a dummy select to select all columns genSelectPlan(pctx, mapJoinOp); return mapJoinOp; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index 9463b63..92f8bb1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -2,21 +2,29 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.GenTezProcContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType; @@ -51,6 +59,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, context.mapJoinParentMap.put(mapJoinOp, parents); } + BaseWork myWork = null; + while (childOp != null) { if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof FileSinkOperator)) { /* @@ -63,9 +73,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, * */ - BaseWork myWork = context.operatorWorkMap.get(childOp); + myWork = context.operatorWorkMap.get(childOp); BaseWork parentWork = context.operatorWorkMap.get(parentRS); - + // set the link between mapjoin and parent vertex int pos = context.mapJoinParentMap.get(mapJoinOp).indexOf(parentRS); if (pos == -1) { @@ -97,8 +107,57 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, } } + // create the dummy operators + List> dummyOperators = + new ArrayList>(); + + // create an new operator: HashTable DummyOpeator, which share the table desc + HashTableDummyDesc desc = new HashTableDummyDesc(); + @SuppressWarnings("unchecked") + HashTableDummyOperator dummyOp = (HashTableDummyOperator) OperatorFactory.get(desc); + TableDesc tbl; + + // need to create the correct table descriptor for key/value + RowSchema rowSchema = parentRS.getParentOperators().get(0).getSchema(); + tbl = PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromRowSchema(rowSchema, "")); + dummyOp.getConf().setTbl(tbl); + + Map> keyExprMap = mapJoinOp.getConf().getKeys(); + List keyCols = keyExprMap.get(Byte.valueOf((byte) 0)); + StringBuffer keyOrder = new StringBuffer(); + for (ExprNodeDesc k: keyCols) { + keyOrder.append("+"); + } + TableDesc keyTableDesc = PlanUtils.getReduceKeyTableDesc(PlanUtils + .getFieldSchemasFromColumnList(keyCols, "mapjoinkey"), keyOrder.toString()); + mapJoinOp.getConf().setKeyTableDesc(keyTableDesc); + + // let the dummy op be the parent of mapjoin op + mapJoinOp.replaceParent(parentRS, dummyOp); + List> dummyChildren = + new ArrayList>(); + dummyChildren.add(mapJoinOp); + dummyOp.setChildOperators(dummyChildren); + // add this dummy op to the dummp operator list + dummyOperators.add(dummyOp); + // cut the operator tree so as to not retain connections from the parent RS downstream - parentRS.removeChild(mapJoinOp); + List> childOperators = parentRS.getChildOperators(); + int childIndex = childOperators.indexOf(mapJoinOp); + childOperators.remove(childIndex); + + // the "work" needs to know about the dummy operators. They have to be separately initialized + // at task startup + if (myWork != null) { + myWork.addDummyOp(dummyOp); + } else { + List> dummyList = dummyOperators; + if (context.linkChildOpWithDummyOp.containsKey(childOp)) { + dummyList = context.linkChildOpWithDummyOp.get(childOp); + } + dummyList.add(dummyOp); + context.linkChildOpWithDummyOp.put(childOp, dummyList); + } return true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java index 2efa7c2..490d48d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java @@ -189,7 +189,8 @@ private int getPosition(MapWork work, Operator joinOp, // optimize this newWork given the big table position String bigTableAlias = - MapJoinProcessor.genMapJoinOpAndLocalWork(newWork, newJoinOp, bigTablePosition); + MapJoinProcessor.genMapJoinOpAndLocalWork(physicalContext.getParseContext().getConf(), + newWork, newJoinOp, bigTablePosition); return new ObjectPair(newTask, bigTableAlias); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java index 698b67b..0b41db8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java @@ -432,7 +432,8 @@ private MapJoinOperator getMapJoinOperator(MapRedTask task, opParseContextMap.put(newSMBJoinOp, opParseContextMap.get(oldSMBJoinOp)); // generate the map join operator - return MapJoinProcessor.convertSMBJoinToMapJoin(opParseContextMap, newSMBJoinOp, + return MapJoinProcessor.convertSMBJoinToMapJoin(physicalContext.getConf(), + opParseContextMap, newSMBJoinOp, joinTree, mapJoinPos, true); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java index 088fe79..d78f39a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java @@ -94,6 +94,9 @@ // what position in the mapjoin the different parent work items will have. public final Map>> mapJoinParentMap; + // remember the dummy ops we created + public final Map, List>> linkChildOpWithDummyOp; + @SuppressWarnings("unchecked") public GenTezProcContext(HiveConf conf, ParseContext parseContext, List> moveTask, List> rootTasks, @@ -111,5 +114,6 @@ public GenTezProcContext(HiveConf conf, ParseContext parseContext, this.linkOpWithWorkMap = new HashMap, List>(); this.operatorWorkMap = new HashMap, BaseWork>(); this.mapJoinParentMap = new HashMap>>(); + this.linkChildOpWithDummyOp = new HashMap, List>>(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 9233a9f..41fb7d1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; @@ -118,7 +119,7 @@ public Object process(Node nd, Stack stack, GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink); // remember which parent belongs to which tag - reduceWork.getTagToInput().put(reduceSink.getConf().getTag(), + reduceWork.getTagToInput().put(reduceSink.getConf().getTag(), context.preceedingWork.getName()); tezWork.add(reduceWork); @@ -151,12 +152,12 @@ public Object process(Node nd, Stack stack, ReduceSinkOperator rs = (ReduceSinkOperator) operator; ReduceWork rWork = (ReduceWork) followingWork; GenMapRedUtils.setKeyAndValueDesc(rWork, rs); - + // remember which parent belongs to which tag rWork.getTagToInput().put(rs.getConf().getTag(), work.getName()); // add dependency between the two work items - tezWork.connect(work, context.leafOperatorToFollowingWork.get(operator), + tezWork.connect(work, context.leafOperatorToFollowingWork.get(operator), EdgeType.SIMPLE_EDGE); } @@ -200,6 +201,11 @@ public Object process(Node nd, Stack stack, context.operatorWorkMap.put(operator, work); List linkWorkList = context.linkOpWithWorkMap.get(operator); if (linkWorkList != null) { + if (context.linkChildOpWithDummyOp.containsKey(operator)) { + for (Operator dummy: context.linkChildOpWithDummyOp.get(operator)) { + work.addDummyOp((HashTableDummyOperator) dummy); + } + } for (BaseWork parentWork : linkWorkList) { tezWork.connect(parentWork, work, EdgeType.BROADCAST_EDGE); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index 8654da6..828ceb7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -19,10 +19,10 @@ package org.apache.hadoop.hive.ql.plan; import java.util.ArrayList; -import java.util.HashMap; +import java.util.LinkedList; import java.util.List; -import java.util.Map; +import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.Operator; /** @@ -32,6 +32,8 @@ @SuppressWarnings({"serial", "deprecation"}) public abstract class BaseWork extends AbstractOperatorDesc { + List dummyOps; + public BaseWork() {} public BaseWork(String name) { @@ -58,6 +60,21 @@ public void setName(String name) { this.name = name; } + public List getDummyOps() { + return dummyOps; + } + + public void setDummyOps(List dummyOps) { + this.dummyOps = dummyOps; + } + + public void addDummyOp(HashTableDummyOperator dummyOp) { + if (dummyOps == null) { + dummyOps = new LinkedList(); + } + dummyOps.add(dummyOp); + } + protected abstract List> getAllRootOperators(); public List> getAllOperators() { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java index 6c354d3..1bcfa25 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java @@ -29,7 +29,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.exec.ColumnInfo; @@ -376,28 +378,55 @@ public static TableDesc getReduceKeyTableDesc(List fieldSchemas, /** * Generate the table descriptor for Map-side join key. */ - public static TableDesc getMapJoinKeyTableDesc(List fieldSchemas) { - return new TableDesc(SequenceFileInputFormat.class, - SequenceFileOutputFormat.class, Utilities.makeProperties("columns", - MetaStoreUtils.getColumnNamesFromFieldSchema(fieldSchemas), - "columns.types", MetaStoreUtils - .getColumnTypesFromFieldSchema(fieldSchemas), - serdeConstants.ESCAPE_CHAR, "\\", - serdeConstants.SERIALIZATION_LIB,LazyBinarySerDe.class.getName())); + public static TableDesc getMapJoinKeyTableDesc(Configuration conf, + List fieldSchemas) { + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_OPTIMIZE_TEZ)) { + StringBuffer order = new StringBuffer(); + for (FieldSchema f: fieldSchemas) { + order.append("+"); + } + return new TableDesc( + SequenceFileInputFormat.class, SequenceFileOutputFormat.class, + Utilities.makeProperties(serdeConstants.LIST_COLUMNS, MetaStoreUtils + .getColumnNamesFromFieldSchema(fieldSchemas), + serdeConstants.LIST_COLUMN_TYPES, MetaStoreUtils + .getColumnTypesFromFieldSchema(fieldSchemas), + serdeConstants.SERIALIZATION_SORT_ORDER, order.toString(), + serdeConstants.SERIALIZATION_LIB, BinarySortableSerDe.class.getName())); + } else { + return new TableDesc(SequenceFileInputFormat.class, + SequenceFileOutputFormat.class, Utilities.makeProperties("columns", + MetaStoreUtils.getColumnNamesFromFieldSchema(fieldSchemas), + "columns.types", MetaStoreUtils + .getColumnTypesFromFieldSchema(fieldSchemas), + serdeConstants.ESCAPE_CHAR, "\\", + serdeConstants.SERIALIZATION_LIB,LazyBinarySerDe.class.getName())); + } } /** * Generate the table descriptor for Map-side join key. */ - public static TableDesc getMapJoinValueTableDesc( + public static TableDesc getMapJoinValueTableDesc(Configuration conf, List fieldSchemas) { - return new TableDesc(SequenceFileInputFormat.class, - SequenceFileOutputFormat.class, Utilities.makeProperties("columns", - MetaStoreUtils.getColumnNamesFromFieldSchema(fieldSchemas), - "columns.types", MetaStoreUtils - .getColumnTypesFromFieldSchema(fieldSchemas), - serdeConstants.ESCAPE_CHAR, "\\", - serdeConstants.SERIALIZATION_LIB,LazyBinarySerDe.class.getName())); + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_OPTIMIZE_TEZ)) { + return new TableDesc(SequenceFileInputFormat.class, + SequenceFileOutputFormat.class, Utilities.makeProperties( + serdeConstants.LIST_COLUMNS, MetaStoreUtils + .getColumnNamesFromFieldSchema(fieldSchemas), + serdeConstants.LIST_COLUMN_TYPES, MetaStoreUtils + .getColumnTypesFromFieldSchema(fieldSchemas), + serdeConstants.ESCAPE_CHAR, "\\", + serdeConstants.SERIALIZATION_LIB,LazyBinarySerDe.class.getName())); + } else { + return new TableDesc(SequenceFileInputFormat.class, + SequenceFileOutputFormat.class, Utilities.makeProperties("columns", + MetaStoreUtils.getColumnNamesFromFieldSchema(fieldSchemas), + "columns.types", MetaStoreUtils + .getColumnTypesFromFieldSchema(fieldSchemas), + serdeConstants.ESCAPE_CHAR, "\\", + serdeConstants.SERIALIZATION_LIB,LazyBinarySerDe.class.getName())); + } } /**