diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index eff4d30..296716f 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2174,7 +2174,10 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { NWAYJOINREORDER("hive.reorder.nway.joins", true, "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"), HIVE_LOG_N_RECORDS("hive.log.every.n.records", 0L, new RangeValidator(0L, null), - "If value is greater than 0 logs in fixed intervals of size n rather than exponentially."); + "If value is greater than 0 logs in fixed intervals of size n rather than exponentially."), + HIVE_TEZ_ENABLE_MEMORY_MANAGER("hive.tez.enable.memory.manager", false, + "Enable memory manager for tez") + ; public final String varname; private final String defaultExpr; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index 9867739..77aeaa0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -383,6 +383,11 @@ public static FastBitSet groupingSet2BitSet(int value) { newKeys = keyWrapperFactory.getKeyWrapper(); + maxMemory = conf.getMemoryNeeded(); + if (maxMemory == 0) { + memoryMXBean = ManagementFactory.getMemoryMXBean(); + maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); + } firstRow = true; // estimate the number of hash table entries based on the size of each // entry. Since the size of a entry @@ -390,8 +395,7 @@ public static FastBitSet groupingSet2BitSet(int value) { if (hashAggr) { computeMaxEntriesHashAggr(hconf); } - memoryMXBean = ManagementFactory.getMemoryMXBean(); - maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); + memoryThreshold = this.getConf().getMemoryThreshold(); return result; } @@ -407,7 +411,7 @@ public static FastBitSet groupingSet2BitSet(int value) { **/ private void computeMaxEntriesHashAggr(Configuration hconf) throws HiveException { float memoryPercentage = this.getConf().getGroupByMemoryUsage(); - maxHashTblMemory = (long) (memoryPercentage * Runtime.getRuntime().maxMemory()); + maxHashTblMemory = (long) (memoryPercentage * maxMemory); estimateRowSize(); } @@ -965,7 +969,7 @@ private void flushHashTable(boolean complete) throws HiveException { hashAggregations.clear(); hashAggregations = null; if (isLogInfoEnabled) { - LOG.info("Hash Table completed flushed"); + LOG.info("Hash Table completed flushed"); } return; } @@ -983,9 +987,9 @@ private void flushHashTable(boolean complete) throws HiveException { iter.remove(); numDel++; if (numDel * 10 >= oldSize) { - if (isLogInfoEnabled) { - LOG.info("Hash Table flushed: new size = " + hashAggregations.size()); - } + if (isLogInfoEnabled) { + LOG.info("Hash Table flushed: new size = " + hashAggregations.size()); + } return; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index d5ea96a..4d49f9f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -645,7 +645,6 @@ public void initializeContexts() { } public Deserializer getCurrentDeserializer() { - return currentCtxs[0].deserializer; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java index f93b420..7b19cc5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java @@ -31,40 +31,44 @@ public class PTFTopNHash extends TopNHash { - + protected float memUsage; protected boolean isMapGroupBy; private Map partitionHeaps; private TopNHash largestPartition; private boolean prevIndexPartIsNull; private Set indexesWithNullPartKey; - - public void initialize( - int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector) { - super.initialize(topN, memUsage, isMapGroupBy, collector); + private long maxMemory; + + @Override + public void initialize(int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector, + long maxMemory) { + super.initialize(topN, memUsage, isMapGroupBy, collector, maxMemory); this.isMapGroupBy = isMapGroupBy; this.memUsage = memUsage; + this.maxMemory = maxMemory; partitionHeaps = new HashMap(); indexesWithNullPartKey = new HashSet(); } - + + @Override public int tryStoreKey(HiveKey key, boolean partColsIsNull) throws HiveException, IOException { prevIndexPartIsNull = partColsIsNull; return _tryStoreKey(key, partColsIsNull, -1); } - + private void updateLargest(TopNHash p) { if ( largestPartition == null || largestPartition.usage < p.usage) { largestPartition = p; } } - + private void findLargest() { for(TopNHash p : partitionHeaps.values() ) { updateLargest(p); } } - + public int _tryStoreKey(HiveKey key, boolean partColsIsNull, int batchIndex) throws HiveException, IOException { if (!isEnabled) { return FORWARD; // short-circuit quickly - forward all rows @@ -76,7 +80,7 @@ public int _tryStoreKey(HiveKey key, boolean partColsIsNull, int batchIndex) thr TopNHash partHeap = partitionHeaps.get(pk); if ( partHeap == null ) { partHeap = new TopNHash(); - partHeap.initialize(topN, memUsage, isMapGroupBy, collector); + partHeap.initialize(topN, memUsage, isMapGroupBy, collector, maxMemory); if ( batchIndex >= 0 ) { partHeap.startVectorizedBatch(batchSize); } @@ -101,7 +105,8 @@ public int _tryStoreKey(HiveKey key, boolean partColsIsNull, int batchIndex) thr } return r; } - + + @Override public void storeValue(int index, int hashCode, BytesWritable value, boolean vectorized) { Key pk = new Key(prevIndexPartIsNull, hashCode); TopNHash partHeap = partitionHeaps.get(pk); @@ -110,14 +115,16 @@ public void storeValue(int index, int hashCode, BytesWritable value, boolean vec usage = usage + partHeap.usage; updateLargest(partHeap); } - + + @Override public void flush() throws HiveException { if (!isEnabled || (topN == 0)) return; for(TopNHash partHash : partitionHeaps.values()) { partHash.flush(); } } - + + @Override public int startVectorizedBatch(int size) throws IOException, HiveException { if (!isEnabled) { return FORWARD; // short-circuit quickly - forward all rows @@ -137,7 +144,8 @@ public int startVectorizedBatch(int size) throws IOException, HiveException { indexesWithNullPartKey.clear(); return 0; } - + + @Override public void tryStoreVectorizedKey(HiveKey key, boolean partColsIsNull, int batchIndex) throws HiveException, IOException { _tryStoreKey(key, partColsIsNull, batchIndex); @@ -146,39 +154,43 @@ public void tryStoreVectorizedKey(HiveKey key, boolean partColsIsNull, int batch } batchIndexToResult[batchIndex] = key.hashCode(); } - + + @Override public int getVectorizedBatchResult(int batchIndex) { prevIndexPartIsNull = indexesWithNullPartKey.contains(batchIndex); Key pk = new Key(prevIndexPartIsNull, batchIndexToResult[batchIndex]); TopNHash partHeap = partitionHeaps.get(pk); return partHeap.getVectorizedBatchResult(batchIndex); } - + + @Override public HiveKey getVectorizedKeyToForward(int batchIndex) { prevIndexPartIsNull = indexesWithNullPartKey.contains(batchIndex); Key pk = new Key(prevIndexPartIsNull, batchIndexToResult[batchIndex]); TopNHash partHeap = partitionHeaps.get(pk); return partHeap.getVectorizedKeyToForward(batchIndex); } - + + @Override public int getVectorizedKeyDistLength(int batchIndex) { prevIndexPartIsNull = indexesWithNullPartKey.contains(batchIndex); Key pk = new Key(prevIndexPartIsNull, batchIndexToResult[batchIndex]); TopNHash partHeap = partitionHeaps.get(pk); return partHeap.getVectorizedKeyDistLength(batchIndex); } - + + @Override public int getVectorizedKeyHashCode(int batchIndex) { prevIndexPartIsNull = indexesWithNullPartKey.contains(batchIndex); Key pk = new Key(prevIndexPartIsNull, batchIndexToResult[batchIndex]); TopNHash partHeap = partitionHeaps.get(pk); return partHeap.getVectorizedKeyHashCode(batchIndex); } - + static class Key { boolean isNull; int hashCode; - + public Key(boolean isNull, int hashCode) { super(); this.isNull = isNull; @@ -205,11 +217,11 @@ public boolean equals(Object obj) { return false; return true; } - + @Override public String toString() { return "" + hashCode + "," + isNull; } - + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 859a28f..247aea2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -86,6 +86,7 @@ private boolean skipTag = false; private transient InspectableObject tempInspectableObject = new InspectableObject(); private transient int[] valueIndex; // index for value(+ from keys, - from values) + protected transient long maxMem; protected transient OutputCollector out; /** @@ -236,8 +237,12 @@ float memUsage = conf.getTopNMemoryUsage(); if (limit >= 0 && memUsage > 0) { + maxMem = conf.getMemoryNeeded(); + if (maxMem == 0) { + maxMem = Runtime.getRuntime().maxMemory(); + } reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash; - reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this); + reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this, maxMem); } useUniformHash = conf.getReducerTraits().contains(UNIFORM); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java index 484006a..f830685 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java @@ -82,6 +82,7 @@ protected boolean isEnabled = false; private final Comparator C = new Comparator() { + @Override public int compare(Integer o1, Integer o2) { byte[] key1 = keys[o1]; byte[] key2 = keys[o2]; @@ -92,7 +93,7 @@ public int compare(Integer o1, Integer o2) { }; public void initialize( - int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector) { + int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector, long maxMemory) { assert topN >= 0 && memUsage > 0; assert !this.isEnabled; this.isEnabled = false; @@ -104,7 +105,7 @@ public void initialize( } // limit * 64 : compensation of arrays for key/value/hashcodes - this.threshold = (long) (memUsage * Runtime.getRuntime().maxMemory()) - topN * 64; + this.threshold = (long) (memUsage * maxMemory) - topN * 64; if (threshold < 0) { return; } @@ -202,7 +203,7 @@ public void tryStoreVectorizedKey(HiveKey key, boolean partColsIsNull, int batch Integer collisionIndex = indexes.store(index); if (null != collisionIndex) { /* - * since there is a collision index will be used for the next value + * since there is a collision index will be used for the next value * so have the map point back to original index. */ if ( indexes instanceof HashForGroup ) { @@ -286,7 +287,7 @@ public int getVectorizedKeyDistLength(int batchIndex) { public int getVectorizedKeyHashCode(int batchIndex) { return hashes[batchIndexToResult[batchIndex]]; } - + /** * Stores the value for the key in the heap. * @param index The index, either from tryStoreKey or from tryStoreVectorizedKey result. @@ -377,7 +378,7 @@ private void flushInternal() throws IOException, HiveException { } excluded = 0; } - + private interface IndexStore { int size(); /** @@ -395,21 +396,25 @@ private void flushInternal() throws IOException, HiveException { private class HashForRow implements IndexStore { private final MinMaxPriorityQueue indexes = MinMaxPriorityQueue.orderedBy(C).create(); + @Override public int size() { return indexes.size(); } // returns null always + @Override public Integer store(int index) { boolean result = indexes.add(index); assert result; return null; } + @Override public int removeBiggest() { return indexes.removeLast(); } + @Override public Iterable indexes() { Integer[] array = indexes.toArray(new Integer[indexes.size()]); Arrays.sort(array, 0, array.length, C); @@ -425,21 +430,25 @@ public int removeBiggest() { // TreeSet anyway uses TreeMap; so use plain TreeMap to be able to get value in collisions. private final TreeMap indexes = new TreeMap(C); + @Override public int size() { return indexes.size(); } // returns false if index already exists in map + @Override public Integer store(int index) { return indexes.put(index, index); } + @Override public int removeBiggest() { Integer last = indexes.lastKey(); indexes.remove(last); return last; } + @Override public Iterable indexes() { return indexes.keySet(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java index 2ba622e..2530503 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java @@ -118,7 +118,7 @@ * Write buffers for keys and values. For the description of the structure above, think * of this as one infinite byte buffer. */ - private WriteBuffers writeBuffers; + private final WriteBuffers writeBuffers; private final float loadFactor; @@ -144,7 +144,9 @@ * so we'd stop earlier on read collision. Need to profile on real queries. */ private long[] refs; - private int startingHashBitCount, hashBitCount; + private final int startingHashBitCount; + + private int hashBitCount; private int metricPutConflict = 0, metricGetConflict = 0, metricExpands = 0, metricExpandsMs = 0; @@ -153,6 +155,8 @@ /** 8 Gb of refs is the max capacity if memory limit is not specified. If someone has 100s of * Gbs of memory (this might happen pretty soon) we'd need to string together arrays anyway. */ private final static int DEFAULT_MAX_CAPACITY = 1024 * 1024 * 1024; + private long memMax = 0L; + private long memUsed = 0L; public BytesBytesMultiHashMap(int initialCapacity, float loadFactor, int wbSize, long memUsage) { @@ -164,18 +168,20 @@ public BytesBytesMultiHashMap(int initialCapacity, ? initialCapacity : nextHighestPowerOfTwo(initialCapacity); // 8 bytes per long in the refs, assume data will be empty. This is just a sanity check. int maxCapacity = (memUsage <= 0) ? DEFAULT_MAX_CAPACITY - : (int)Math.min((long)DEFAULT_MAX_CAPACITY, memUsage / 8); + : (int)Math.min(DEFAULT_MAX_CAPACITY, memUsage / 8); if (maxCapacity < initialCapacity || initialCapacity <= 0) { // Either initialCapacity is too large, or nextHighestPowerOfTwo overflows initialCapacity = (Long.bitCount(maxCapacity) == 1) ? maxCapacity : nextLowestPowerOfTwo(maxCapacity); } + this.memMax = maxCapacity; validateCapacity(initialCapacity); startingHashBitCount = 63 - Long.numberOfLeadingZeros(initialCapacity); this.loadFactor = loadFactor; refs = new long[initialCapacity]; writeBuffers = new WriteBuffers(wbSize, MAX_WB_SIZE); + memUsed = refs.length * (Long.SIZE / 8) + wbSize; resizeThreshold = (int)(initialCapacity * this.loadFactor); } @@ -185,7 +191,7 @@ public BytesBytesMultiHashMap(int initialCapacity, } public class ThreadSafeGetter { - private WriteBuffers.Position position = new WriteBuffers.Position(); + private final WriteBuffers.Position position = new WriteBuffers.Position(); public byte getValueResult(byte[] key, int offset, int length, BytesBytesMultiHashMap.Result hashMapResult) { return BytesBytesMultiHashMap.this.getValueResult(key, offset, length, hashMapResult, position); @@ -227,7 +233,7 @@ public void populateValue(WriteBuffers.ByteSegmentRef valueRef) { private long readIndex; // A reference to the current row. - private WriteBuffers.ByteSegmentRef byteSegmentRef; + private final WriteBuffers.ByteSegmentRef byteSegmentRef; public Result() { hasRows = false; @@ -301,8 +307,8 @@ public void set(BytesBytesMultiHashMap hashMap, long firstOffset, boolean hasLis } /** - * Read the current value. - * + * Read the current value. + * * @return * The ByteSegmentRef to the current value read. */ @@ -752,7 +758,7 @@ private void expandAndRehash() { long capacity = refs.length << 1; expandAndRehashImpl(capacity); } - + private void expandAndRehashImpl(long capacity) { long expandTime = System.currentTimeMillis(); final long[] oldRefs = refs; @@ -781,6 +787,10 @@ private void expandAndRehashImpl(long capacity) { int probeSteps = relocateKeyRef(newRefs, oldRef, hashCode); maxSteps = Math.max(probeSteps, maxSteps); } + memUsed = memUsed - this.refs.length * (Long.SIZE / 8) + newRefs.length * (Long.SIZE / 8); + if (memUsed > memMax) { + LOG.warn("Hash table occupying more memory: " + memUsed + " than allocated: " + memMax); + } this.refs = newRefs; this.largestNumberOfSteps = maxSteps; this.hashBitCount = newHashBitCount; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index f773cb9..27f2f19 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -344,58 +344,132 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration Map partitionerConf; EdgeType edgeType = edgeProp.getEdgeType(); - switch (edgeType) { - case BROADCAST_EDGE: - UnorderedKVEdgeConfig et1Conf = UnorderedKVEdgeConfig - .newBuilder(keyClass, valClass) - .setFromConfiguration(conf) - .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) - .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) - .build(); - return et1Conf.createDefaultBroadcastEdgeProperty(); - case CUSTOM_EDGE: - assert partitionerClassName != null; - partitionerConf = createPartitionerConf(partitionerClassName, conf); - UnorderedPartitionedKVEdgeConfig et2Conf = UnorderedPartitionedKVEdgeConfig - .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf) - .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) - .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) - .build(); - EdgeManagerPluginDescriptor edgeDesc = - EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName()); - CustomEdgeConfiguration edgeConf = - new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null); - DataOutputBuffer dob = new DataOutputBuffer(); - edgeConf.write(dob); - byte[] userPayload = dob.getData(); - edgeDesc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload))); - return et2Conf.createDefaultCustomEdgeProperty(edgeDesc); - case CUSTOM_SIMPLE_EDGE: - assert partitionerClassName != null; - partitionerConf = createPartitionerConf(partitionerClassName, conf); - UnorderedPartitionedKVEdgeConfig et3Conf = UnorderedPartitionedKVEdgeConfig - .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf) - .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) - .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) - .build(); - return et3Conf.createDefaultEdgeProperty(); - case SIMPLE_EDGE: - default: - assert partitionerClassName != null; - partitionerConf = createPartitionerConf(partitionerClassName, conf); - OrderedPartitionedKVEdgeConfig et4Conf = OrderedPartitionedKVEdgeConfig - .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf) - .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), - TezBytesComparator.class.getName(), null) - .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) - .build(); - return et4Conf.createDefaultEdgeProperty(); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER)) { + switch (edgeType) { + case BROADCAST_EDGE: + // setUnsortedMemoryRequirementConfig(edgeProp, conf); + UnorderedKVEdgeConfig et1Conf = + UnorderedKVEdgeConfig.newBuilder(keyClass, valClass).setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .configureInput().setShuffleBufferFraction(edgeProp.getInputMemoryNeededFraction()) + .done().build(); + return et1Conf.createDefaultBroadcastEdgeProperty(); + case CUSTOM_EDGE: + // setUnsortedMemoryRequirementConfig(edgeProp, conf); + assert partitionerClassName != null; + partitionerConf = createPartitionerConf(partitionerClassName, conf); + UnorderedPartitionedKVEdgeConfig et2Conf = + UnorderedPartitionedKVEdgeConfig + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .configureInput().setShuffleBufferFraction(edgeProp.getInputMemoryNeededFraction()) + .done().configureOutput() + .setAvailableBufferSize((int) edgeProp.getOutputMemoryNeededMB()).done().build(); + EdgeManagerPluginDescriptor edgeDesc = + EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName()); + CustomEdgeConfiguration edgeConf = + new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null); + DataOutputBuffer dob = new DataOutputBuffer(); + edgeConf.write(dob); + byte[] userPayload = dob.getData(); + edgeDesc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload))); + return et2Conf.createDefaultCustomEdgeProperty(edgeDesc); + case CUSTOM_SIMPLE_EDGE: + // setUnsortedMemoryRequirementConfig(edgeProp, conf); + assert partitionerClassName != null; + partitionerConf = createPartitionerConf(partitionerClassName, conf); + UnorderedPartitionedKVEdgeConfig et3Conf = + UnorderedPartitionedKVEdgeConfig + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .configureInput().setShuffleBufferFraction(edgeProp.getInputMemoryNeededFraction()) + .done().configureOutput() + .setAvailableBufferSize((int) edgeProp.getOutputMemoryNeededMB()).done().build(); + return et3Conf.createDefaultEdgeProperty(); + case SIMPLE_EDGE: + default: + assert partitionerClassName != null; + partitionerConf = createPartitionerConf(partitionerClassName, conf); + OrderedPartitionedKVEdgeConfig et4Conf = + OrderedPartitionedKVEdgeConfig + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), + TezBytesComparator.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .configureInput().setShuffleBufferFraction(edgeProp.getInputMemoryNeededFraction()) + .setPostMergeBufferFraction(0).done().configureOutput() + .setSortBufferSize((int) edgeProp.getOutputMemoryNeededMB()).done().build(); + return et4Conf.createDefaultEdgeProperty(); + } + } else { + switch (edgeType) { + case BROADCAST_EDGE: + UnorderedKVEdgeConfig et1Conf = + UnorderedKVEdgeConfig.newBuilder(keyClass, valClass).setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .build(); + return et1Conf.createDefaultBroadcastEdgeProperty(); + case CUSTOM_EDGE: + assert partitionerClassName != null; + partitionerConf = createPartitionerConf(partitionerClassName, conf); + UnorderedPartitionedKVEdgeConfig et2Conf = + UnorderedPartitionedKVEdgeConfig + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .build(); + EdgeManagerPluginDescriptor edgeDesc = + EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName()); + CustomEdgeConfiguration edgeConf = + new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null); + DataOutputBuffer dob = new DataOutputBuffer(); + edgeConf.write(dob); + byte[] userPayload = dob.getData(); + edgeDesc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload))); + return et2Conf.createDefaultCustomEdgeProperty(edgeDesc); + case CUSTOM_SIMPLE_EDGE: + assert partitionerClassName != null; + partitionerConf = createPartitionerConf(partitionerClassName, conf); + UnorderedPartitionedKVEdgeConfig et3Conf = + UnorderedPartitionedKVEdgeConfig + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .build(); + return et3Conf.createDefaultEdgeProperty(); + case SIMPLE_EDGE: + default: + assert partitionerClassName != null; + partitionerConf = createPartitionerConf(partitionerClassName, conf); + OrderedPartitionedKVEdgeConfig et4Conf = + OrderedPartitionedKVEdgeConfig + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), + TezBytesComparator.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .build(); + return et4Conf.createDefaultEdgeProperty(); + } } } + private void setUnsortedMemoryRequirementConfig(TezEdgeProperty edgeProp, Configuration conf) { + conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, + edgeProp.getOutputMemoryNeededMB()); + conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, + edgeProp.getInputMemoryNeededFraction()); + } + /** * Utility method to create a stripped down configuration for the MR partitioner. * diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index f606ec0..314551d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -76,6 +76,8 @@ public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class); private MapRecordSource[] sources; private final Map multiMRInputMap = new HashMap(); + private static Map connectOps = + new TreeMap(); private int position = 0; MRInputLegacy legacyMRInput = null; MultiMRInput mainWorkMultiMRInput = null; @@ -87,9 +89,6 @@ List cacheKeys; ObjectCache cache; - private static Map connectOps = - new TreeMap(); - public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception { super(jconf, context); ObjectCache cache = ObjectCacheFactory.getCache(jconf); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java index c563d9d..5571692 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java @@ -123,6 +123,7 @@ protected void createOutputMap() { public List getMergeWorkList(final JobConf jconf, String key, String queryId, ObjectCache cache, List cacheKeys) throws HiveException { String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); + cacheKeys = new ArrayList(); if (prefixes != null) { List mergeWorkList = new ArrayList(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 39a83e3..55e0fb9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -416,8 +416,11 @@ private void computeMemoryLimits() { keyWrappersBatch.getKeysFixedSize() + aggregationBatchInfo.getAggregatorsFixedSize(); - MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); - maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); + maxMemory = conf.getMemoryNeeded(); + if (maxMemory == 0) { + MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); + } memoryThreshold = conf.getMemoryThreshold(); // Tests may leave this unitialized, so better set it to 1 if (memoryThreshold == 0.0f) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index d42b643..81c2878 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; +import org.apache.hadoop.hive.ql.plan.DummyStoreDesc; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -69,6 +70,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName()); + private long totalMemorySize; @Override /* @@ -162,13 +164,13 @@ MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos); // map join operator by default has no bucket cols and num of reduce sinks // reduced by 1 - mapJoinOp -.setOpTraits(new OpTraits(null, -1, null)); + mapJoinOp.setOpTraits(new OpTraits(null, -1, null)); mapJoinOp.setStatistics(joinOp.getStatistics()); // propagate this change till the next RS for (Operator childOp : mapJoinOp.getChildOperators()) { setAllChildrenTraitsToNull(childOp); } + mapJoinOp.getConf().setMemoryNeeded(totalMemorySize); return null; } @@ -302,6 +304,7 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont // insert the dummy store operator here DummyStoreOperator dummyStoreOp = new TezDummyStoreOperator(); + dummyStoreOp.setConf(new DummyStoreDesc()); dummyStoreOp.setParentOperators(new ArrayList>()); dummyStoreOp.setChildOperators(new ArrayList>()); dummyStoreOp.getChildOperators().add(mergeJoinOp); @@ -338,6 +341,7 @@ private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcCon } MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePosition); + mapJoinOp.getConf().setMemoryNeeded(totalMemorySize); MapJoinDesc joinDesc = mapJoinOp.getConf(); joinDesc.setBucketMapJoin(true); @@ -594,6 +598,9 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c } pos++; } + if (bigTablePosition != -1) { + this.totalMemorySize = (totalSize / buckets); + } return bigTablePosition; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index f7e1dbc..7d52619 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -219,7 +219,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, } } } - TezEdgeProperty edgeProp = new TezEdgeProperty(null, edgeType, numBuckets); + TezEdgeProperty edgeProp = new TezEdgeProperty(null, edgeType, numBuckets, tableSize); if (mapJoinWork != null) { for (BaseWork myWork: mapJoinWork) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java new file mode 100644 index 0000000..948e90c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java @@ -0,0 +1,314 @@ +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Set; +import java.util.Stack; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.StatsTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.tez.DagUtils; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; +import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.yarn.api.records.Resource; + +public class MemoryDecider implements PhysicalPlanResolver { + + public class MemoryCalculator implements Dispatcher { + + private final HiveConf conf; + private final Resource resourceAvailable; + private long inputOutputBufferLimit = 0; + private long parallelism = 0; + private final long onePercentMemory; + private long remainingMemory; + private long totalAvailableMemory; + private final PhysicalContext pctx; + private long incomingSize; + private long outgoingSize; + private static final long RESOURCE_LIMIT = 50; // buffer/operator resources limited to 50% + private static final long DEFAULT_RESOURCE_SIZE = 10; + + public MemoryCalculator(PhysicalContext pctx) { + this.pctx = pctx; + this.conf = pctx.conf; + this.resourceAvailable = DagUtils.getContainerResource(conf); + this.totalAvailableMemory = resourceAvailable.getMemory() * 1024 * 1024; + this.onePercentMemory = totalAvailableMemory / 100; + } + + @SuppressWarnings("unchecked") + @Override + public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) + throws SemanticException { + Task currTask = (Task) nd; + if (currTask instanceof StatsTask) { + currTask = ((StatsTask) currTask).getWork().getSourceTask(); + } + if (currTask instanceof TezTask) { + TezWork work = ((TezTask) currTask).getWork(); + for (BaseWork w : work.getAllWork()) { + evaluateWork(work, w); + } + } + return null; + } + + private void evaluateWork(TezWork work, BaseWork w) throws SemanticException { + + if (w instanceof MapWork) { + evaluateMapWork(work, (MapWork) w); + } else if (w instanceof ReduceWork) { + evaluateReduceWork(work, (ReduceWork) w); + } else if (w instanceof MergeJoinWork) { + evaluateMergeWork(work, (MergeJoinWork) w); + } else { + throw new SemanticException("Unknown work type: " + w); + } + } + + // this needs to account for the other work items embedded within the merge join work + private void evaluateMergeWork(TezWork work, MergeJoinWork w) throws SemanticException { + long totalAvailableMemory = this.totalAvailableMemory; + this.totalAvailableMemory = totalAvailableMemory / w.getBaseWorkList().size(); + long totalIncomingSize = getInputMemoryRequired(work, w); + long totalOutgoingSize = getOutputMemoryRequired(work, w); + for (BaseWork baseWork : w.getBaseWorkList()) { + computeMemoryForOperators(work, baseWork, totalIncomingSize, totalOutgoingSize); + } + recomputeMemoryForInputOutput(work, w, incomingSize, outgoingSize); + } + + private void evaluateReduceWork(TezWork work, ReduceWork w) throws SemanticException { + parallelism = w.getNumReduceTasks(); + if (parallelism <= 0) { + parallelism = 1; + } + long totalIncomingSize = getInputMemoryRequired(work, w); + + long totalOutgoingSize = getOutputMemoryRequired(work, w); + + computeMemoryForOperators(work, w, totalIncomingSize, totalOutgoingSize); + + recomputeMemoryForInputOutput(work, w, incomingSize, outgoingSize); + } + + private void evaluateMapWork(TezWork work, MapWork w) throws SemanticException { + long maxSplitSize = HiveConf.getLongVar(conf, HiveConf.ConfVars.MAPREDMAXSPLITSIZE); + parallelism = w.getMemoryNeeded() / maxSplitSize; + if (parallelism <= 0) { + parallelism = 1; + } + long totalIncomingSize = getInputMemoryRequired(work, w); + + long totalOutgoingSize = getOutputMemoryRequired(work, w); + + computeMemoryForOperators(work, w, totalIncomingSize, totalOutgoingSize); + + recomputeMemoryForInputOutput(work, w, incomingSize, outgoingSize); + } + + private void recomputeMemoryForInputOutput(TezWork work, BaseWork w, long totalIncomingSize, + long totalOutgoingSize) { + List parentWorkList = work.getParents(w); + for (BaseWork parentWork : parentWorkList) { + TezEdgeProperty edgeProp = work.getEdgeProperty(parentWork, w); + if (parallelism != 0) { + totalIncomingSize = parallelism * totalIncomingSize; + } + edgeProp.setInputMemoryNeededFraction(parentWork.getMemoryNeeded() / totalIncomingSize); + } + + List childWorkList = work.getChildren(w); + if (childWorkList.isEmpty()) { + return; + } + for (BaseWork childWork : childWorkList) { + TezEdgeProperty edgeProp = work.getEdgeProperty(w, childWork); + // one size fits all? + edgeProp.setOutputMemoryNeeded(outgoingSize / childWorkList.size()); + } + } + + private long getInputMemoryRequired(TezWork work, BaseWork w) { + long totalIncomingSize = 0L; + List parentWorkList = work.getParents(w); + if (parentWorkList.isEmpty()) { + totalIncomingSize = onePercentMemory * DEFAULT_RESOURCE_SIZE; + return totalIncomingSize; + } + for (BaseWork parentWork : parentWorkList) { + TezEdgeProperty edgeProp = work.getEdgeProperty(parentWork, w); + if (edgeProp.getEdgeType() == EdgeType.SIMPLE_EDGE) { + // we need buffer space for incoming shuffled data + totalIncomingSize += edgeProp.getEstimatedTransferBytes() / parallelism; + } else { + totalIncomingSize += onePercentMemory; + } + } + return totalIncomingSize; + } + + private long getOutputMemoryRequired(TezWork work, BaseWork w) { + // FIXME + // 10% per input? + long totalOutgoingSize = onePercentMemory * DEFAULT_RESOURCE_SIZE; // default 10% + List childWorkList = work.getChildren(w); + for (BaseWork childWork : childWorkList) { + TezEdgeProperty edgeProp = work.getEdgeProperty(w, childWork); + EdgeType edgeType = edgeProp.getEdgeType(); + // we need estimate for only one outgoing edge because tez does replication of output. + // this breaks down if there is a case of different type of edges downstream - say + // sorted and unsorted edges. If we look at only the unsorted type of edge, we may end up + // underestimating the amount of memory required. + if (edgeType == EdgeType.SIMPLE_EDGE) { + totalOutgoingSize = edgeProp.getEstimatedTransferBytes(); + break; + } + } + + return totalOutgoingSize; + } + + private void computeMemoryForOperators(TezWork work, BaseWork w, long totalIncomingSize, + long totalOutgoingSize) throws SemanticException { + + // if the buffers require > 50% of memory, lets now limit the percent to 50% of memory. + // if the operator pipeline has excess, this value can grow + boolean capped = false; + incomingSize = totalIncomingSize; + outgoingSize = totalOutgoingSize; + if ((totalIncomingSize + totalOutgoingSize) > (totalAvailableMemory / 2)) { + // capped to 50% till we see the operator pipeline + capped = true; + this.inputOutputBufferLimit = onePercentMemory * RESOURCE_LIMIT; + incomingSize = + totalIncomingSize * inputOutputBufferLimit / (totalIncomingSize + totalOutgoingSize); + outgoingSize = inputOutputBufferLimit - incomingSize; + } else { + // it needs amount of memory less than or equal to 50% + this.inputOutputBufferLimit = totalIncomingSize + totalOutgoingSize; + } + + this.remainingMemory = totalAvailableMemory - (this.inputOutputBufferLimit); + // fill in the operator memory requirements and find total + evaluateOperators(w, pctx); + if ((remainingMemory > 0) && (capped)) { + // operator tree had excess memory. We can use the excess for the inputs/outputs + long incomingIncrease = + totalIncomingSize * remainingMemory / (totalIncomingSize + totalOutgoingSize); + incomingSize += incomingIncrease; + outgoingSize += (remainingMemory - incomingIncrease); + } + } + + private long evaluateOperators(BaseWork w, PhysicalContext pctx) throws SemanticException { + // lets take a look at the operator memory requirements. + Dispatcher disp = + new DefaultRuleDispatcher(new DefaultRule(), new HashMap(), + null); + GraphWalker ogw = new DefaultGraphWalker(disp); + + ArrayList topNodes = new ArrayList(); + topNodes.addAll(w.getAllRootOperators()); + + LinkedHashMap nodeOutput = new LinkedHashMap(); + ogw.startWalking(topNodes, nodeOutput); + + return computeMemoryRequirements(nodeOutput.keySet()); + } + + private long computeMemoryRequirements(Set keySet) { + long retval = 0; + List> opList = new ArrayList>(); + long minMemory = + (keySet.size() > 100) ? totalAvailableMemory / keySet.size() : onePercentMemory; + long totalMemoryNeeded = 0; + for (Node nd : keySet) { + Operator op = (Operator) nd; + long memoryNeeded = 0; + + memoryNeeded = op.getConf().getMemoryNeeded(); + if (memoryNeeded == 0) { + memoryNeeded = minMemory; + } else { + memoryNeeded += minMemory; + opList.add(op); + } + + totalMemoryNeeded += memoryNeeded; + op.getConf().setMemoryNeeded(memoryNeeded); + } + + if (totalMemoryNeeded > remainingMemory) { + long minMemoryRequired = keySet.size() * minMemory; + remainingMemory -= minMemoryRequired; + totalMemoryNeeded -= minMemoryRequired; + for (Operator op : opList) { + long memNeeded = (op.getConf().getMemoryNeeded() * remainingMemory) / totalMemoryNeeded; + op.getConf().setMemoryNeeded(memNeeded); + retval += memNeeded; + } + remainingMemory -= retval; + retval += minMemoryRequired; + if (remainingMemory < 0) { + throw new IllegalStateException("Remaining memory cannot be negative"); + } + } else { + retval = totalMemoryNeeded; + remainingMemory -= totalMemoryNeeded; + } + return retval; + } + + public class DefaultRule implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + // do nothing for all operators + return null; + } + } + } + + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + + pctx.getConf(); + + // create dispatcher and graph walker + Dispatcher disp = new MemoryCalculator(pctx); + TaskGraphWalker ogw = new TaskGraphWalker(disp); + + // get all the tasks nodes from root task + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getRootTasks()); + + // begin to walk through the task tree. + ogw.startWalking(topNodes, null); + return pctx; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 241e9d7..29d10e8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -147,9 +147,12 @@ public ReduceWork createReduceWork(GenTezProcContext context, Operator root, if (reduceWork.isAutoReduceParallelism()) { edgeProp = new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true, - reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), bytesPerReducer); + reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), bytesPerReducer, + reduceSink.getConf().getStatistics().getDataSize()); } else { - edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + edgeProp = + new TezEdgeProperty(EdgeType.SIMPLE_EDGE, reduceSink.getConf().getStatistics() + .getDataSize()); } tezWork.connect( diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 6db8220..1b243be 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -135,6 +135,10 @@ public Object process(Node nd, Stack stack, context.rootToWorkMap.put(root, work); } + if (operator.getStatistics() != null) { + work.setMemoryNeeded(operator.getStatistics().getDataSize()); + } + // this is where we set the sort columns that we will be using for KeyValueInputMerge if (operator instanceof DummyStoreOperator) { work.addSortCols(root.getOpTraits().getSortCols().get(0)); @@ -379,9 +383,12 @@ public Object process(Node nd, Stack stack, if (rWork.isAutoReduceParallelism()) { edgeProp = new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true, - rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer); + rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer, rs + .getConf().getStatistics().getDataSize()); } else { - edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + edgeProp = + new TezEdgeProperty(EdgeType.SIMPLE_EDGE, rs.getConf().getStatistics() + .getDataSize()); } tezWork.connect(work, followingWork, edgeProp); context.connectedReduceSinks.add(rs); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index ea12990..d848b23 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism; import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits; import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck; +import org.apache.hadoop.hive.ql.optimizer.physical.MemoryDecider; import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; @@ -475,6 +476,10 @@ protected void optimizeTaskPlan(List> rootTasks, Pa } else { LOG.debug("Skipping stage id rearranger"); } + + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER)) { + physicalCtx = new MemoryDecider().resolve(physicalCtx); + } return; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java index 0a83440..3615041 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java @@ -30,6 +30,8 @@ protected transient Statistics statistics; protected transient OpTraits opTraits; protected transient Map opProps; + protected long memNeeded = 0; + protected transient long memoryInUse = 0; static { PTFUtils.makeTransient(AbstractOperatorDesc.class, "opProps"); @@ -59,14 +61,17 @@ public void setVectorMode(boolean vm) { this.vectorMode = vm; } + @Override public OpTraits getTraits() { return opTraits; } + @Override public void setTraits(OpTraits opTraits) { this.opTraits = opTraits; } + @Override public Map getOpProps() { return opProps; } @@ -74,4 +79,14 @@ public void setTraits(OpTraits opTraits) { public void setOpProps(Map props) { this.opProps = props; } + + @Override + public long getMemoryNeeded() { + return memNeeded; + } + + @Override + public void setMemoryNeeded(long memNeeded) { + this.memNeeded = memNeeded; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java index 0f2855e..58bce5e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hive.common.util.AnnotationUtils; @@ -227,7 +226,7 @@ public boolean isAggregate() { } return false; } - + @Explain(displayName = "bucketGroup", displayOnlyOnTrue = true) public boolean getBucketGroup() { return bucketGroup; @@ -301,4 +300,12 @@ public void setDistinct(boolean isDistinct) { this.isDistinct = isDistinct; } + @Override + public long getMemoryNeeded() { + if (statistics != null) { + return statistics.getDataSize(); + } else { + return memNeeded; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java index fb4d3b4..16be499 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java @@ -28,4 +28,6 @@ public OpTraits getTraits(); public void setTraits(OpTraits opTraits); public Map getOpProps(); + public long getMemoryNeeded(); + public void setMemoryNeeded(long memoryNeeded); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index 1891dff..2f8560d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -92,7 +92,7 @@ private float topNMemoryUsage = -1; private boolean mapGroupBy; // for group-by, values with same key on top-K should be forwarded //flag used to control how TopN handled for PTF/Windowing partitions. - private boolean isPTFReduceSink = false; + private boolean isPTFReduceSink = false; private boolean skipTag; // Skip writing tags when feeding into mapjoin hashtable public static enum ReducerTraits { @@ -417,7 +417,7 @@ public final void setReducerTraits(EnumSet traits) { // reducers or hash function. boolean wasUnset = this.reduceTraits.remove(ReducerTraits.UNSET); - + if (this.reduceTraits.contains(ReducerTraits.FIXED)) { return; } else if (traits.contains(ReducerTraits.FIXED)) { @@ -449,4 +449,13 @@ public boolean hasOrderBy() { public void setHasOrderBy(boolean hasOrderBy) { this.hasOrderBy = hasOrderBy; } + + @Override + public long getMemoryNeeded() { + if (statistics != null) { + return statistics.getDataSize(); + } else { + return memNeeded; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java index a3aa12f..3546cad 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java @@ -30,33 +30,46 @@ CUSTOM_SIMPLE_EDGE, } - private HiveConf hiveConf; - private EdgeType edgeType; - private int numBuckets; + private static final long ONE_MB = 1024 * 1024; + + private final HiveConf hiveConf; + private final EdgeType edgeType; + private final int numBuckets; private boolean isAutoReduce; private int minReducer; private int maxReducer; private long inputSizePerReducer; + private final long estimatedTransferBytes; + private float inputMemoryNeededFraction; + private long outputMemoryNeeded; - public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, - int buckets) { + public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, int buckets, + long estimatedTransferBytes) { this.hiveConf = hiveConf; this.edgeType = edgeType; this.numBuckets = buckets; + this.estimatedTransferBytes = estimatedTransferBytes; + this.setInputMemoryNeededFraction(0.0f); + this.setOutputMemoryNeeded(0); } public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, boolean isAutoReduce, - int minReducer, int maxReducer, long bytesPerReducer) { - this(hiveConf, edgeType, -1); + int minReducer, int maxReducer, long bytesPerReducer, long estimatedTransferBytes) { + this(hiveConf, edgeType, -1, estimatedTransferBytes); this.minReducer = minReducer; this.maxReducer = maxReducer; this.isAutoReduce = isAutoReduce; this.inputSizePerReducer = bytesPerReducer; } + public TezEdgeProperty(EdgeType edgeType, long estimatedTransferBytes) { + this(null, edgeType, -1, estimatedTransferBytes); + } + + // called by test code only. public TezEdgeProperty(EdgeType edgeType) { - this(null, edgeType, -1); + this(edgeType, 0); } public EdgeType getEdgeType() { @@ -86,4 +99,36 @@ public int getMaxReducer() { public long getInputSizePerReducer() { return inputSizePerReducer; } + + public long getEstimatedTransferBytes() { + return estimatedTransferBytes; + } + + public void setInputMemoryNeededFraction(float inputMemoryNeededFraction) { + if (inputMemoryNeededFraction < 0 || inputMemoryNeededFraction > 1) { + this.inputMemoryNeededFraction = 1.0f; + return; + } + this.inputMemoryNeededFraction = inputMemoryNeededFraction; + } + + public float getInputMemoryNeededFraction() { + return inputMemoryNeededFraction; + } + + public long getOutputMemoryNeeded() { + return outputMemoryNeeded; + } + + public long getOutputMemoryNeededMB() { + if (outputMemoryNeeded < ONE_MB) { + return 1; + } else { + return outputMemoryNeeded / ONE_MB; + } + } + + public void setOutputMemoryNeeded(long outputMemoryNeeded) { + this.outputMemoryNeeded = outputMemoryNeeded; + } } diff --git ql/src/test/queries/clientpositive/bucket_map_join_tez1.q ql/src/test/queries/clientpositive/bucket_map_join_tez1.q index 42e26a8..afea926 100644 --- ql/src/test/queries/clientpositive/bucket_map_join_tez1.q +++ ql/src/test/queries/clientpositive/bucket_map_join_tez1.q @@ -1,6 +1,7 @@ set hive.auto.convert.join=true; set hive.auto.convert.join.noconditionaltask=true; set hive.auto.convert.join.noconditionaltask.size=10000; +set hive.tez.enable.memory.manager=true; CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; CREATE TABLE tab_part (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; diff --git ql/src/test/queries/clientpositive/tez_smb_1.q ql/src/test/queries/clientpositive/tez_smb_1.q index 580672f..25d9b21 100644 --- ql/src/test/queries/clientpositive/tez_smb_1.q +++ ql/src/test/queries/clientpositive/tez_smb_1.q @@ -4,6 +4,8 @@ set hive.auto.convert.join.noconditionaltask=true; set hive.auto.convert.join.noconditionaltask.size=10000; set hive.auto.convert.sortmerge.join.bigtable.selection.policy = org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ; + set hive.tez.enable.memory.manager=true; + CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; CREATE TABLE tab_part (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;