diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3963e1d..de9a9ea 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2105,7 +2105,10 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { NWAYJOINREORDER("hive.reorder.nway.joins", true, "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"), HIVE_LOG_N_RECORDS("hive.log.every.n.records", 0L, new RangeValidator(0L, null), - "If value is greater than 0 logs in fixed intervals of size n rather than exponentially."); + "If value is greater than 0 logs in fixed intervals of size n rather than exponentially."), + HIVE_TEZ_ENABLE_MEMORY_MANAGER("hive.tez.enable.memory.manager", false, + "Enable memory manager for tez") + ; public final String varname; private final String defaultExpr; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java index b20ae5c..20592cd 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java @@ -32,7 +32,7 @@ private final Arena[] arenas; private AtomicInteger allocatedArenas = new AtomicInteger(0); - private final MemoryManager memoryManager; + private final MemoryManagerInterface memoryManager; // Config settings private final int minAllocLog2, maxAllocLog2, arenaSizeLog2, maxArenas; @@ -41,7 +41,7 @@ private final boolean isDirect; private final LlapDaemonCacheMetrics metrics; - public BuddyAllocator(Configuration conf, MemoryManager memoryManager, + public BuddyAllocator(Configuration conf, MemoryManagerInterface memoryManager, LlapDaemonCacheMetrics metrics) { isDirect = HiveConf.getBoolVar(conf, ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT); minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java index 7b6d3c0..a3e5896 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java @@ -31,7 +31,7 @@ * reserve most of the time, by calling the evictor to evict some memory. releaseMemory is * called rarely. */ -public class LowLevelCacheMemoryManager implements MemoryManager { +public class LowLevelCacheMemoryManager implements MemoryManagerInterface { private final AtomicLong usedMemory; protected final long maxSize; private final LowLevelCachePolicy evictor; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java deleted file mode 100644 index d454ec8..0000000 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.cache; - -public interface MemoryManager { - boolean reserveMemory(long memoryToReserve, boolean waitForEviction); - void releaseMemory(long memUsage); -} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManagerInterface.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManagerInterface.java new file mode 100644 index 0000000..8e7f2ce --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManagerInterface.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cache; + +public interface MemoryManagerInterface { + boolean reserveMemory(long memoryToReserve, boolean waitForEviction); + void releaseMemory(long memUsage); +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 061e875..f856d5f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.llap.LlapNodeId; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; @@ -96,7 +97,7 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSi // 80% of memory considered for accounted buffers. Rest for objects. // TODO Tune this based on the available size. - this.memoryPerExecutor = (long)(totalMemoryAvailableBytes * 0.8 / (float) numExecutors); + this.memoryPerExecutor = (long)(totalMemoryAvailableBytes * 0.8 / numExecutors); this.metrics = metrics; try { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 86b1f5c..afc7b03 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -52,7 +52,7 @@ private final ContainerRunnerImpl containerRunner; private final LlapRegistryService registry; private final AtomicLong numSubmissions = new AtomicLong(0); - private JvmPauseMonitor pauseMonitor; + private final JvmPauseMonitor pauseMonitor; private final ObjectName llapDaemonInfoBean; private final LlapDaemonExecutorMetrics metrics; @@ -184,6 +184,7 @@ public void serviceStart() throws Exception { registry.registerWorker(); } + @Override public void serviceStop() throws Exception { // TODO Shutdown LlapIO shutdown(); @@ -312,6 +313,4 @@ public boolean isIoEnabled() { public long getMaxJvmMemory() { return maxJvmMemory; } - - } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java index ee6399b..78b914f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer; import org.apache.hadoop.hive.llap.cache.LowLevelCacheMemoryManager; import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy; -import org.apache.hadoop.hive.llap.cache.MemoryManager; +import org.apache.hadoop.hive.llap.cache.MemoryManagerInterface; import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey; @@ -34,10 +34,10 @@ new ConcurrentHashMap(); private final ConcurrentHashMap stripeMetadata = new ConcurrentHashMap(); - private final MemoryManager memoryManager; + private final MemoryManagerInterface memoryManager; private final LowLevelCachePolicy policy; - public OrcMetadataCache(MemoryManager memoryManager, LowLevelCachePolicy policy) { + public OrcMetadataCache(MemoryManagerInterface memoryManager, LowLevelCachePolicy policy) { this.memoryManager = memoryManager; this.policy = policy; } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java index 3bea70f..bbda0a0 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java @@ -38,7 +38,7 @@ private static final Log LOG = LogFactory.getLog(TestBuddyAllocator.class); private final Random rdm = new Random(2284); - private static class DummyMemoryManager implements MemoryManager { + private static class DummyMemoryManager implements MemoryManagerInterface { @Override public boolean reserveMemory(long memoryToReserve, boolean waitForEviction) { return true; diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java index 9d769c5..a60b081 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java @@ -51,7 +51,7 @@ public void setEvictionListener(EvictionListener listener) { } } - private static class DummyMemoryManager implements MemoryManager { + private static class DummyMemoryManager implements MemoryManagerInterface { int allocs = 0; @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecutionMemoryManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecutionMemoryManager.java new file mode 100644 index 0000000..677ea6f --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecutionMemoryManager.java @@ -0,0 +1,83 @@ +package org.apache.hadoop.hive.ql.exec; + +import java.util.HashMap; +import java.util.Map; + + +public class ExecutionMemoryManager { + public class MemInfo { + private long totalAvailable; + private long memConsumed; + + public MemInfo(long totalAvailable, long memConsumed) { + setTotalAvailable(totalAvailable); + setMemConsumed(memConsumed); + } + + public long getTotalAvailable() { + return totalAvailable; + } + + public void setTotalAvailable(long totalAvailable) { + this.totalAvailable = totalAvailable; + } + + public long getMemConsumed() { + return memConsumed; + } + + public void setMemConsumed(long memConsumed) { + this.memConsumed = memConsumed; + } + } + + private final Map, MemInfo> opMemoryMap = new HashMap, MemInfo>(); + private Operator refOp = null; + + public void setMemoryInfo(Operator op, long totalAvailable) { + MemInfo memInfo = new MemInfo(totalAvailable, 0L); + opMemoryMap.put(op, memInfo); + } + + public boolean canAllocate(Operator op, long neededSize) { + MemInfo memInfo = opMemoryMap.get(op); + if (memInfo == null) { + throw new IllegalStateException("Operator has not initialized its memory requirement."); + } + + long newRequirement = memInfo.getMemConsumed() + neededSize; + if (newRequirement > memInfo.getTotalAvailable()) { + return false; + } else { + memInfo.setMemConsumed(newRequirement); + return true; + } + } + + public void setRefOp(Operator refOp) { + this.refOp = refOp; + } + + public void releaseMemory(Operator op) { + opMemoryMap.remove(op); + } + + public boolean canAllocate(long capacity) { + if (refOp == null) { + throw new IllegalStateException("Invalid state. Reference operator not set"); + } + return canAllocate(refOp, capacity); + } + + public void releaseMemory(long freed) { + if (refOp == null) { + throw new IllegalStateException("Invalid state. Reference operator not set"); + } + releaseMemory(refOp, freed); + } + + public void releaseMemory(Operator op, long freed) { + long memConsumed = opMemoryMap.get(op).getMemConsumed() - freed; + opMemoryMap.get(op).setMemConsumed(memConsumed); + } +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index 9867739..77aeaa0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -383,6 +383,11 @@ public static FastBitSet groupingSet2BitSet(int value) { newKeys = keyWrapperFactory.getKeyWrapper(); + maxMemory = conf.getMemoryNeeded(); + if (maxMemory == 0) { + memoryMXBean = ManagementFactory.getMemoryMXBean(); + maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); + } firstRow = true; // estimate the number of hash table entries based on the size of each // entry. Since the size of a entry @@ -390,8 +395,7 @@ public static FastBitSet groupingSet2BitSet(int value) { if (hashAggr) { computeMaxEntriesHashAggr(hconf); } - memoryMXBean = ManagementFactory.getMemoryMXBean(); - maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); + memoryThreshold = this.getConf().getMemoryThreshold(); return result; } @@ -407,7 +411,7 @@ public static FastBitSet groupingSet2BitSet(int value) { **/ private void computeMaxEntriesHashAggr(Configuration hconf) throws HiveException { float memoryPercentage = this.getConf().getGroupByMemoryUsage(); - maxHashTblMemory = (long) (memoryPercentage * Runtime.getRuntime().maxMemory()); + maxHashTblMemory = (long) (memoryPercentage * maxMemory); estimateRowSize(); } @@ -965,7 +969,7 @@ private void flushHashTable(boolean complete) throws HiveException { hashAggregations.clear(); hashAggregations = null; if (isLogInfoEnabled) { - LOG.info("Hash Table completed flushed"); + LOG.info("Hash Table completed flushed"); } return; } @@ -983,9 +987,9 @@ private void flushHashTable(boolean complete) throws HiveException { iter.remove(); numDel++; if (numDel * 10 >= oldSize) { - if (isLogInfoEnabled) { - LOG.info("Hash Table flushed: new size = " + hashAggregations.size()); - } + if (isLogInfoEnabled) { + LOG.info("Hash Table flushed: new size = " + hashAggregations.size()); + } return; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java index a5f4e5b..069d91a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java @@ -109,8 +109,7 @@ public void process(Object row, int tag) throws HiveException { // Print a message if we reached at least 1000 rows for a join operand // We won't print a message for the last join operand since the size // will never goes to joinEmitInterval. - LOG.info("table " + alias + " has " + sz + " rows for join key " - + keyObject); + LOG.info("table " + alias + " has " + sz + " rows for join key " + keyObject); nextSz = getNextSize(nextSz); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 1a836bd..5b246fe 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -279,8 +279,11 @@ public void generateMapMetaData() throws HiveException { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE); loader.init(mapContext, mrContext, hconf, this); - long memUsage = (long)(MapJoinMemoryExhaustionHandler.getMaxHeapSize() - * conf.getHashTableMemoryUsage()); + long memUsage = this.getConf().getMemoryNeeded(); + if (memUsage == 0) { + memUsage = + (long) (MapJoinMemoryExhaustionHandler.getMaxHeapSize() * conf.getHashTableMemoryUsage()); + } loader.load(mapJoinTables, mapJoinTableSerdes, memUsage); hashTblInitedOnce = true; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java index 4eca2d8..efa7ae8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java @@ -42,6 +42,7 @@ private static final Log logger = LogFactory.getLog("MapredContext"); private static final ThreadLocal contexts = new ThreadLocal(); + private final ExecutionMemoryManager memManager; public static MapredContext get() { return contexts.get(); @@ -74,6 +75,7 @@ protected MapredContext(boolean isMap, JobConf jobConf) { this.isMap = isMap; this.jobConf = jobConf; this.udfs = new ArrayList(); + this.memManager = new ExecutionMemoryManager(); } /** @@ -161,4 +163,8 @@ private boolean needClose(Closeable func) { return false; } } + + public ExecutionMemoryManager getMemoryManager() { + return memManager; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryManager.java new file mode 100644 index 0000000..414617b --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryManager.java @@ -0,0 +1,9 @@ +package org.apache.hadoop.hive.ql.exec; + +public class MemoryManager { + + public static void initialize(long memoryPerExecutor) { + memoryPerExecutor = memoryPerExecutor; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java index 5d48651..dd879fd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java @@ -40,13 +40,13 @@ private ObjectCacheFactory() { public static ObjectCache getCache(Configuration conf) { if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { if (LlapIoProxy.isDaemon()) { // daemon - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED)) { - return new org.apache.hadoop.hive.ql.exec.tez.LlapObjectCache(); - } else { // no cache - return new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(); - } + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED)) { + return new org.apache.hadoop.hive.ql.exec.tez.LlapObjectCache(); + } else { // no cache + return new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(); + } } else { // container - return new org.apache.hadoop.hive.ql.exec.tez.ObjectCache(); + return new org.apache.hadoop.hive.ql.exec.tez.ObjectCache(); } } else { // mr or spark return new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 5856cfd..b4a3966 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -323,6 +323,14 @@ public final void initialize(Configuration hconf, ObjectInspector[] inputOIs) return; } + if (MapredContext.get() != null) { + // can happen with fetch tasks. + ExecutionMemoryManager memManager = MapredContext.get().getMemoryManager(); + if ((memManager != null) && (getConf() != null)) { + memManager.setMemoryInfo(this, this.getConf().getMemoryNeeded()); + } + } + this.configuration = hconf; if (!areAllParentsInitialized()) { return; @@ -392,9 +400,8 @@ private void completeInitialization(Collection> fs) throws HiveExcepti } /** - * This metod can be used to retrieve the results from async operations - * started at init time - before the operator pipeline is started. - * + * This method can be used to retrieve the results from async operations started at init time - + * before the operator pipeline is started. * @param os * @throws HiveException */ @@ -644,6 +651,8 @@ public void close(boolean abort) throws HiveException { * should overwrite this funtion for their specific cleanup routine. */ protected void closeOp(boolean abort) throws HiveException { + ExecutionMemoryManager memManager = MapredContext.get().getMemoryManager(); + memManager.releaseMemory(this); } private boolean jobCloseDone = false; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java index f93b420..7b19cc5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java @@ -31,40 +31,44 @@ public class PTFTopNHash extends TopNHash { - + protected float memUsage; protected boolean isMapGroupBy; private Map partitionHeaps; private TopNHash largestPartition; private boolean prevIndexPartIsNull; private Set indexesWithNullPartKey; - - public void initialize( - int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector) { - super.initialize(topN, memUsage, isMapGroupBy, collector); + private long maxMemory; + + @Override + public void initialize(int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector, + long maxMemory) { + super.initialize(topN, memUsage, isMapGroupBy, collector, maxMemory); this.isMapGroupBy = isMapGroupBy; this.memUsage = memUsage; + this.maxMemory = maxMemory; partitionHeaps = new HashMap(); indexesWithNullPartKey = new HashSet(); } - + + @Override public int tryStoreKey(HiveKey key, boolean partColsIsNull) throws HiveException, IOException { prevIndexPartIsNull = partColsIsNull; return _tryStoreKey(key, partColsIsNull, -1); } - + private void updateLargest(TopNHash p) { if ( largestPartition == null || largestPartition.usage < p.usage) { largestPartition = p; } } - + private void findLargest() { for(TopNHash p : partitionHeaps.values() ) { updateLargest(p); } } - + public int _tryStoreKey(HiveKey key, boolean partColsIsNull, int batchIndex) throws HiveException, IOException { if (!isEnabled) { return FORWARD; // short-circuit quickly - forward all rows @@ -76,7 +80,7 @@ public int _tryStoreKey(HiveKey key, boolean partColsIsNull, int batchIndex) thr TopNHash partHeap = partitionHeaps.get(pk); if ( partHeap == null ) { partHeap = new TopNHash(); - partHeap.initialize(topN, memUsage, isMapGroupBy, collector); + partHeap.initialize(topN, memUsage, isMapGroupBy, collector, maxMemory); if ( batchIndex >= 0 ) { partHeap.startVectorizedBatch(batchSize); } @@ -101,7 +105,8 @@ public int _tryStoreKey(HiveKey key, boolean partColsIsNull, int batchIndex) thr } return r; } - + + @Override public void storeValue(int index, int hashCode, BytesWritable value, boolean vectorized) { Key pk = new Key(prevIndexPartIsNull, hashCode); TopNHash partHeap = partitionHeaps.get(pk); @@ -110,14 +115,16 @@ public void storeValue(int index, int hashCode, BytesWritable value, boolean vec usage = usage + partHeap.usage; updateLargest(partHeap); } - + + @Override public void flush() throws HiveException { if (!isEnabled || (topN == 0)) return; for(TopNHash partHash : partitionHeaps.values()) { partHash.flush(); } } - + + @Override public int startVectorizedBatch(int size) throws IOException, HiveException { if (!isEnabled) { return FORWARD; // short-circuit quickly - forward all rows @@ -137,7 +144,8 @@ public int startVectorizedBatch(int size) throws IOException, HiveException { indexesWithNullPartKey.clear(); return 0; } - + + @Override public void tryStoreVectorizedKey(HiveKey key, boolean partColsIsNull, int batchIndex) throws HiveException, IOException { _tryStoreKey(key, partColsIsNull, batchIndex); @@ -146,39 +154,43 @@ public void tryStoreVectorizedKey(HiveKey key, boolean partColsIsNull, int batch } batchIndexToResult[batchIndex] = key.hashCode(); } - + + @Override public int getVectorizedBatchResult(int batchIndex) { prevIndexPartIsNull = indexesWithNullPartKey.contains(batchIndex); Key pk = new Key(prevIndexPartIsNull, batchIndexToResult[batchIndex]); TopNHash partHeap = partitionHeaps.get(pk); return partHeap.getVectorizedBatchResult(batchIndex); } - + + @Override public HiveKey getVectorizedKeyToForward(int batchIndex) { prevIndexPartIsNull = indexesWithNullPartKey.contains(batchIndex); Key pk = new Key(prevIndexPartIsNull, batchIndexToResult[batchIndex]); TopNHash partHeap = partitionHeaps.get(pk); return partHeap.getVectorizedKeyToForward(batchIndex); } - + + @Override public int getVectorizedKeyDistLength(int batchIndex) { prevIndexPartIsNull = indexesWithNullPartKey.contains(batchIndex); Key pk = new Key(prevIndexPartIsNull, batchIndexToResult[batchIndex]); TopNHash partHeap = partitionHeaps.get(pk); return partHeap.getVectorizedKeyDistLength(batchIndex); } - + + @Override public int getVectorizedKeyHashCode(int batchIndex) { prevIndexPartIsNull = indexesWithNullPartKey.contains(batchIndex); Key pk = new Key(prevIndexPartIsNull, batchIndexToResult[batchIndex]); TopNHash partHeap = partitionHeaps.get(pk); return partHeap.getVectorizedKeyHashCode(batchIndex); } - + static class Key { boolean isNull; int hashCode; - + public Key(boolean isNull, int hashCode) { super(); this.isNull = isNull; @@ -205,11 +217,11 @@ public boolean equals(Object obj) { return false; return true; } - + @Override public String toString() { return "" + hashCode + "," + isNull; } - + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 468d87f..9a1a976 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -86,6 +86,7 @@ private boolean skipTag = false; private transient InspectableObject tempInspectableObject = new InspectableObject(); private transient int[] valueIndex; // index for value(+ from keys, - from values) + protected transient long maxMem; protected transient OutputCollector out; /** @@ -236,8 +237,12 @@ float memUsage = conf.getTopNMemoryUsage(); if (limit >= 0 && memUsage > 0) { + maxMem = conf.getMemoryNeeded(); + if (maxMem == 0) { + maxMem = Runtime.getRuntime().maxMemory(); + } reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash; - reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this); + reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this, maxMem); } useUniformHash = conf.getReducerTraits().contains(UNIFORM); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java index 484006a..f830685 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java @@ -82,6 +82,7 @@ protected boolean isEnabled = false; private final Comparator C = new Comparator() { + @Override public int compare(Integer o1, Integer o2) { byte[] key1 = keys[o1]; byte[] key2 = keys[o2]; @@ -92,7 +93,7 @@ public int compare(Integer o1, Integer o2) { }; public void initialize( - int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector) { + int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector, long maxMemory) { assert topN >= 0 && memUsage > 0; assert !this.isEnabled; this.isEnabled = false; @@ -104,7 +105,7 @@ public void initialize( } // limit * 64 : compensation of arrays for key/value/hashcodes - this.threshold = (long) (memUsage * Runtime.getRuntime().maxMemory()) - topN * 64; + this.threshold = (long) (memUsage * maxMemory) - topN * 64; if (threshold < 0) { return; } @@ -202,7 +203,7 @@ public void tryStoreVectorizedKey(HiveKey key, boolean partColsIsNull, int batch Integer collisionIndex = indexes.store(index); if (null != collisionIndex) { /* - * since there is a collision index will be used for the next value + * since there is a collision index will be used for the next value * so have the map point back to original index. */ if ( indexes instanceof HashForGroup ) { @@ -286,7 +287,7 @@ public int getVectorizedKeyDistLength(int batchIndex) { public int getVectorizedKeyHashCode(int batchIndex) { return hashes[batchIndexToResult[batchIndex]]; } - + /** * Stores the value for the key in the heap. * @param index The index, either from tryStoreKey or from tryStoreVectorizedKey result. @@ -377,7 +378,7 @@ private void flushInternal() throws IOException, HiveException { } excluded = 0; } - + private interface IndexStore { int size(); /** @@ -395,21 +396,25 @@ private void flushInternal() throws IOException, HiveException { private class HashForRow implements IndexStore { private final MinMaxPriorityQueue indexes = MinMaxPriorityQueue.orderedBy(C).create(); + @Override public int size() { return indexes.size(); } // returns null always + @Override public Integer store(int index) { boolean result = indexes.add(index); assert result; return null; } + @Override public int removeBiggest() { return indexes.removeLast(); } + @Override public Iterable indexes() { Integer[] array = indexes.toArray(new Integer[indexes.size()]); Arrays.sort(array, 0, array.length, C); @@ -425,21 +430,25 @@ public int removeBiggest() { // TreeSet anyway uses TreeMap; so use plain TreeMap to be able to get value in collisions. private final TreeMap indexes = new TreeMap(C); + @Override public int size() { return indexes.size(); } // returns false if index already exists in map + @Override public Integer store(int index) { return indexes.put(index, index); } + @Override public int removeBiggest() { Integer last = indexes.lastKey(); indexes.remove(last); return last; } + @Override public Iterable indexes() { return indexes.keySet(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java index 3930597..742af97 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java @@ -27,6 +27,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.debug.Utils; +import org.apache.hadoop.hive.ql.exec.ExecutionMemoryManager; +import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.WriteBuffers; @@ -118,7 +120,7 @@ * Write buffers for keys and values. For the description of the structure above, think * of this as one infinite byte buffer. */ - private WriteBuffers writeBuffers; + private final WriteBuffers writeBuffers; private final float loadFactor; @@ -144,7 +146,9 @@ * so we'd stop earlier on read collision. Need to profile on real queries. */ private long[] refs; - private int startingHashBitCount, hashBitCount; + private final int startingHashBitCount; + + private int hashBitCount; private int metricPutConflict = 0, metricGetConflict = 0, metricExpands = 0, metricExpandsMs = 0; @@ -154,8 +158,11 @@ * Gbs of memory (this might happen pretty soon) we'd need to string together arrays anyway. */ private final static int DEFAULT_MAX_CAPACITY = 1024 * 1024 * 1024; + private final ExecutionMemoryManager memManager; + public BytesBytesMultiHashMap(int initialCapacity, float loadFactor, int wbSize, long memUsage) { + memManager = MapredContext.get().getMemoryManager(); if (loadFactor < 0 || loadFactor > 1) { throw new AssertionError("Load factor must be between (0, 1]."); } @@ -164,7 +171,7 @@ public BytesBytesMultiHashMap(int initialCapacity, ? initialCapacity : nextHighestPowerOfTwo(initialCapacity); // 8 bytes per long in the refs, assume data will be empty. This is just a sanity check. int maxCapacity = (memUsage <= 0) ? DEFAULT_MAX_CAPACITY - : (int)Math.min((long)DEFAULT_MAX_CAPACITY, memUsage / 8); + : (int)Math.min(DEFAULT_MAX_CAPACITY, memUsage / 8); if (maxCapacity < initialCapacity || initialCapacity <= 0) { // Either initialCapacity is too large, or nextHighestPowerOfTwo overflows initialCapacity = (Long.bitCount(maxCapacity) == 1) @@ -175,6 +182,9 @@ public BytesBytesMultiHashMap(int initialCapacity, startingHashBitCount = 63 - Long.numberOfLeadingZeros(initialCapacity); this.loadFactor = loadFactor; refs = new long[initialCapacity]; + if (memManager.canAllocate(wbSize) == false) { + LOG.error("No more memory available for hash table building"); + } writeBuffers = new WriteBuffers(wbSize, MAX_WB_SIZE); resizeThreshold = (int)(initialCapacity * this.loadFactor); } @@ -185,7 +195,7 @@ public BytesBytesMultiHashMap(int initialCapacity, } public class ThreadSafeGetter { - private WriteBuffers.Position position = new WriteBuffers.Position(); + private final WriteBuffers.Position position = new WriteBuffers.Position(); public byte getValueResult(byte[] key, int offset, int length, BytesBytesMultiHashMap.Result hashMapResult) { return BytesBytesMultiHashMap.this.getValueResult(key, offset, length, hashMapResult, position); @@ -227,7 +237,7 @@ public void populateValue(WriteBuffers.ByteSegmentRef valueRef) { private long readIndex; // A reference to the current row. - private WriteBuffers.ByteSegmentRef byteSegmentRef; + private final WriteBuffers.ByteSegmentRef byteSegmentRef; public Result() { hasRows = false; @@ -301,8 +311,8 @@ public void set(BytesBytesMultiHashMap hashMap, long firstOffset, boolean hasLis } /** - * Read the current value. - * + * Read the current value. + * * @return * The ByteSegmentRef to the current value read. */ @@ -579,13 +589,16 @@ public void expandAndRehashToTarget(int estimateNewRowCount) { } } - private static void validateCapacity(long capacity) { + private void validateCapacity(long capacity) { if (Long.bitCount(capacity) != 1) { throw new AssertionError("Capacity must be a power of two"); } if (capacity <= 0) { throw new AssertionError("Invalid capacity " + capacity); } + if (memManager.canAllocate(capacity) == false) { + throw new AssertionError("Cannot allocate " + capacity + " of memory"); + } } /** @@ -751,7 +764,7 @@ private void expandAndRehash() { long capacity = refs.length << 1; expandAndRehashImpl(capacity); } - + private void expandAndRehashImpl(long capacity) { long expandTime = System.currentTimeMillis(); final long[] oldRefs = refs; @@ -780,6 +793,7 @@ private void expandAndRehashImpl(long capacity) { int probeSteps = relocateKeyRef(newRefs, oldRef, hashCode); maxSteps = Math.max(probeSteps, maxSteps); } + memManager.releaseMemory(refs.length); this.refs = newRefs; this.largestNumberOfSteps = maxSteps; this.hashBitCount = newHashBitCount; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 35baadf..92636e7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -344,58 +344,140 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration Map partitionerConf; EdgeType edgeType = edgeProp.getEdgeType(); - switch (edgeType) { - case BROADCAST_EDGE: - UnorderedKVEdgeConfig et1Conf = UnorderedKVEdgeConfig - .newBuilder(keyClass, valClass) - .setFromConfiguration(conf) - .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) - .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) - .build(); - return et1Conf.createDefaultBroadcastEdgeProperty(); - case CUSTOM_EDGE: - assert partitionerClassName != null; - partitionerConf = createPartitionerConf(partitionerClassName, conf); - UnorderedPartitionedKVEdgeConfig et2Conf = UnorderedPartitionedKVEdgeConfig - .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf) - .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) - .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) - .build(); - EdgeManagerPluginDescriptor edgeDesc = - EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName()); - CustomEdgeConfiguration edgeConf = - new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null); - DataOutputBuffer dob = new DataOutputBuffer(); - edgeConf.write(dob); - byte[] userPayload = dob.getData(); - edgeDesc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload))); - return et2Conf.createDefaultCustomEdgeProperty(edgeDesc); - case CUSTOM_SIMPLE_EDGE: - assert partitionerClassName != null; - partitionerConf = createPartitionerConf(partitionerClassName, conf); - UnorderedPartitionedKVEdgeConfig et3Conf = UnorderedPartitionedKVEdgeConfig - .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf) - .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) - .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) - .build(); - return et3Conf.createDefaultEdgeProperty(); - case SIMPLE_EDGE: - default: - assert partitionerClassName != null; - partitionerConf = createPartitionerConf(partitionerClassName, conf); - OrderedPartitionedKVEdgeConfig et4Conf = OrderedPartitionedKVEdgeConfig - .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf) - .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), - TezBytesComparator.class.getName(), null) - .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) - .build(); - return et4Conf.createDefaultEdgeProperty(); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER)) { + switch (edgeType) { + case BROADCAST_EDGE: + setUnsortedMemoryRequirementConfig(edgeProp, conf); + UnorderedKVEdgeConfig et1Conf = + UnorderedKVEdgeConfig.newBuilder(keyClass, valClass).setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .configureInput().setShuffleBufferFraction(edgeProp.getInputMemoryNeededFraction()) + .done().build(); + return et1Conf.createDefaultBroadcastEdgeProperty(); + case CUSTOM_EDGE: + setUnsortedMemoryRequirementConfig(edgeProp, conf); + assert partitionerClassName != null; + partitionerConf = createPartitionerConf(partitionerClassName, conf); + UnorderedPartitionedKVEdgeConfig et2Conf = + UnorderedPartitionedKVEdgeConfig + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .configureInput().setShuffleBufferFraction(edgeProp.getInputMemoryNeededFraction()) + .done().configureOutput() + .setAvailableBufferSize((int) edgeProp.getOutputMemoryNeeded()).done().build(); + EdgeManagerPluginDescriptor edgeDesc = + EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName()); + CustomEdgeConfiguration edgeConf = + new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null); + DataOutputBuffer dob = new DataOutputBuffer(); + edgeConf.write(dob); + byte[] userPayload = dob.getData(); + edgeDesc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload))); + return et2Conf.createDefaultCustomEdgeProperty(edgeDesc); + case CUSTOM_SIMPLE_EDGE: + setUnsortedMemoryRequirementConfig(edgeProp, conf); + assert partitionerClassName != null; + partitionerConf = createPartitionerConf(partitionerClassName, conf); + UnorderedPartitionedKVEdgeConfig et3Conf = + UnorderedPartitionedKVEdgeConfig + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .configureInput().setShuffleBufferFraction(edgeProp.getInputMemoryNeededFraction()) + .done().configureOutput() + .setAvailableBufferSize((int) edgeProp.getOutputMemoryNeeded()).done().build(); + return et3Conf.createDefaultEdgeProperty(); + case SIMPLE_EDGE: + default: + conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, + edgeProp.getOutputMemoryNeeded()); + conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, + edgeProp.getInputMemoryNeededFraction()); + assert partitionerClassName != null; + partitionerConf = createPartitionerConf(partitionerClassName, conf); + OrderedPartitionedKVEdgeConfig et4Conf = + OrderedPartitionedKVEdgeConfig + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), + TezBytesComparator.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .configureInput().setShuffleBufferFraction(edgeProp.getInputMemoryNeededFraction()) + .setPostMergeBufferFraction(0).done().configureOutput() + .setSortBufferSize((int) edgeProp.getOutputMemoryNeeded()).done().build(); + return et4Conf.createDefaultEdgeProperty(); + } + } else { + switch (edgeType) { + case BROADCAST_EDGE: + setUnsortedMemoryRequirementConfig(edgeProp, conf); + UnorderedKVEdgeConfig et1Conf = + UnorderedKVEdgeConfig.newBuilder(keyClass, valClass).setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .configureInput().setShuffleBufferFraction(edgeProp.getInputMemoryNeededFraction()) + .done().build(); + return et1Conf.createDefaultBroadcastEdgeProperty(); + case CUSTOM_EDGE: + setUnsortedMemoryRequirementConfig(edgeProp, conf); + assert partitionerClassName != null; + partitionerConf = createPartitionerConf(partitionerClassName, conf); + UnorderedPartitionedKVEdgeConfig et2Conf = + UnorderedPartitionedKVEdgeConfig + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .build(); + EdgeManagerPluginDescriptor edgeDesc = + EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName()); + CustomEdgeConfiguration edgeConf = + new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null); + DataOutputBuffer dob = new DataOutputBuffer(); + edgeConf.write(dob); + byte[] userPayload = dob.getData(); + edgeDesc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload))); + return et2Conf.createDefaultCustomEdgeProperty(edgeDesc); + case CUSTOM_SIMPLE_EDGE: + setUnsortedMemoryRequirementConfig(edgeProp, conf); + assert partitionerClassName != null; + partitionerConf = createPartitionerConf(partitionerClassName, conf); + UnorderedPartitionedKVEdgeConfig et3Conf = + UnorderedPartitionedKVEdgeConfig + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .build(); + return et3Conf.createDefaultEdgeProperty(); + case SIMPLE_EDGE: + default: + assert partitionerClassName != null; + partitionerConf = createPartitionerConf(partitionerClassName, conf); + OrderedPartitionedKVEdgeConfig et4Conf = + OrderedPartitionedKVEdgeConfig + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), + TezBytesComparator.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .build(); + return et4Conf.createDefaultEdgeProperty(); + } } } + private void setUnsortedMemoryRequirementConfig(TezEdgeProperty edgeProp, Configuration conf) { + conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, + edgeProp.getOutputMemoryNeeded()); + conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, + edgeProp.getInputMemoryNeededFraction()); + } + /** * Utility method to create a stripped down configuration for the MR partitioner. * diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index 2e15922..6d31440 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -65,6 +65,7 @@ public void init(ExecMapperContext context, MapredContext mrContext, Configurati this.tezContext = (TezContext) mrContext; this.hconf = hconf; this.desc = joinOp.getConf(); + mrContext.getMemoryManager().setRefOp(joinOp); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 44d5418..3916667 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -27,7 +27,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.TreeMap; import java.util.concurrent.Callable; import org.apache.commons.logging.Log; @@ -88,9 +87,6 @@ List cacheKeys; ObjectCache cache; - private static Map connectOps = - new TreeMap(); - public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception { super(jconf, context); if (LlapIoProxy.isDaemon()) { // do not cache plan diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java index c563d9d..5571692 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java @@ -123,6 +123,7 @@ protected void createOutputMap() { public List getMergeWorkList(final JobConf jconf, String key, String queryId, ObjectCache cache, List cacheKeys) throws HiveException { String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); + cacheKeys = new ArrayList(); if (prefixes != null) { List mergeWorkList = new ArrayList(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java index 0820893..3dcb8ae 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java @@ -98,7 +98,6 @@ LOG.info("Original split size is " + rawSplits.length + " grouped split size is " + groupedSplits.length + ", for bucket: " + bucketId + " SplitSizeEstimator: " + splitSizeEstStr); - for (InputSplit inSplit : groupedSplits) { bucketGroupedSplitMultimap.put(bucketId, inSplit); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index bd4fb42..0f32956 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -416,8 +416,11 @@ private void computeMemoryLimits() { keyWrappersBatch.getKeysFixedSize() + aggregationBatchInfo.getAggregatorsFixedSize(); - MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); - maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); + maxMemory = conf.getMemoryNeeded(); + if (maxMemory == 0) { + MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); + } memoryThreshold = conf.getMemoryThreshold(); // Tests may leave this unitialized, so better set it to 1 if (memoryThreshold == 0.0f) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java index bcc14a8..09e70ef 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java @@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; /** @@ -41,27 +40,18 @@ private static final ThreadLocal> threadLocal = new ThreadLocal>() { @Override protected synchronized Map initialValue() { - Map map = new HashMap(); + Map map = new HashMap(); map.put(DEFAULT_CONTEXT, new IOContext()); return map; } }; - private static IOContext get() { - return IOContext.threadLocal.get().get(DEFAULT_CONTEXT); - } - - /** - * Tez and MR use this map but are single threaded per JVM thus no synchronization is required. - */ - private static final Map inputNameIOContextMap = new HashMap(); - public static IOContext get(Configuration conf) { String inputName = conf.get(Utilities.INPUT_NAME); Map inputNameIOContextMap = threadLocal.get(); if (inputName == null) { - inputName = DEFAULT_CONTEXT; + inputName = DEFAULT_CONTEXT; } if (!inputNameIOContextMap.containsKey(inputName)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 8423698..32abb5b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -69,6 +69,8 @@ public class ConvertJoinMapJoin implements NodeProcessor { static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName()); + private static final long SCALING_FACTOR = 3; // 3x size scaling for hash tables + private long totalMemorySize; @Override /* @@ -162,13 +164,13 @@ MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos); // map join operator by default has no bucket cols and num of reduce sinks // reduced by 1 - mapJoinOp -.setOpTraits(new OpTraits(null, -1, null)); + mapJoinOp.setOpTraits(new OpTraits(null, -1, null)); mapJoinOp.setStatistics(joinOp.getStatistics()); // propagate this change till the next RS for (Operator childOp : mapJoinOp.getChildOperators()) { setAllChildrenTraitsToNull(childOp); } + mapJoinOp.getConf().setMemoryNeeded(totalMemorySize); return null; } @@ -338,6 +340,7 @@ private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcCon } MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePosition); + mapJoinOp.getConf().setMemoryNeeded(totalMemorySize); MapJoinDesc joinDesc = mapJoinOp.getConf(); joinDesc.setBucketMapJoin(true); @@ -594,6 +597,9 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c } pos++; } + if (bigTablePosition != -1) { + this.totalMemorySize = (totalSize / buckets) * SCALING_FACTOR; + } return bigTablePosition; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index f7e1dbc..7d52619 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -219,7 +219,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, } } } - TezEdgeProperty edgeProp = new TezEdgeProperty(null, edgeType, numBuckets); + TezEdgeProperty edgeProp = new TezEdgeProperty(null, edgeType, numBuckets, tableSize); if (mapJoinWork != null) { for (BaseWork myWork: mapJoinWork) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java index 0a22f20..475efb5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java @@ -85,9 +85,7 @@ public class LlapDecider implements PhysicalPlanResolver { protected static transient final Log LOG - = LogFactory.getLog(LlapDecider.class); - - private PhysicalContext physicalContext; + = LogFactory.getLog(LlapDecider.class); private HiveConf conf; @@ -102,17 +100,16 @@ class LlapDecisionDispatcher implements Dispatcher { - private PhysicalContext pctx; - private HiveConf conf; + private final HiveConf conf; public LlapDecisionDispatcher(PhysicalContext pctx) { - this.pctx = pctx; this.conf = pctx.getConf(); } @Override public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) - throws SemanticException { + throws SemanticException { + @SuppressWarnings("unchecked") Task currTask = (Task) nd; if (currTask instanceof TezTask) { TezWork work = ((TezTask) currTask).getWork(); @@ -123,31 +120,27 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) return null; } - private void handleWork(TezWork tezWork, BaseWork work) - throws SemanticException { + private void handleWork(TezWork tezWork, BaseWork work) throws SemanticException { if (evaluateWork(tezWork, work)) { convertWork(tezWork, work); } } - private void convertWork(TezWork tezWork, BaseWork work) - throws SemanticException { + private void convertWork(TezWork tezWork, BaseWork work) throws SemanticException { if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_AUTO_ALLOW_UBER)) { - // let's see if we can go one step further and just uber this puppy - if (tezWork.getChildren(work).isEmpty() - && work instanceof ReduceWork - && ((ReduceWork) work).getNumReduceTasks() == 1) { - work.setUberMode(true); - } + // let's see if we can go one step further and just uber this puppy + if (tezWork.getChildren(work).isEmpty() && work instanceof ReduceWork + && ((ReduceWork) work).getNumReduceTasks() == 1) { + work.setUberMode(true); + } } // always mark as llap work.setLlapMode(true); } - private boolean evaluateWork(TezWork tezWork, BaseWork work) - throws SemanticException { + private boolean evaluateWork(TezWork tezWork, BaseWork work) throws SemanticException { LOG.info("Evaluating work item: " + work.getName()); @@ -232,11 +225,12 @@ private boolean checkExpression(ExprNodeDesc expr) { ExprNodeDesc cur = exprs.removeFirst(); if (cur == null) continue; if (cur.getChildren() != null) { - exprs.addAll(cur.getChildren()); - } + exprs.addAll(cur.getChildren()); + } if (cur instanceof ExprNodeGenericFuncDesc) { - // getRequiredJars is currently broken (requires init in some cases before you can call it) + // getRequiredJars is currently broken (requires init in some cases before you can call + // it) // String[] jars = ((ExprNodeGenericFuncDesc)cur).getGenericUDF().getRequiredJars(); // if (jars != null && !(jars.length == 0)) { // LOG.info(String.format("%s requires %s", cur.getExprString(), Joiner.on(", ").join(jars))); @@ -254,7 +248,7 @@ private boolean checkExpression(ExprNodeDesc expr) { private boolean checkAggregator(AggregationDesc agg) throws SemanticException { if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Checking '%s'", agg.getExprString())); + LOG.debug(String.format("Checking '%s'", agg.getExprString())); } boolean result = checkExpressions(agg.getParameters()); @@ -277,12 +271,12 @@ private boolean checkExpressions(Collection exprs) { private boolean checkAggregators(Collection aggs) { boolean result = true; try { - for (AggregationDesc agg: aggs) { - result = result && checkAggregator(agg); - } + for (AggregationDesc agg : aggs) { + result = result && checkAggregator(agg); + } } catch (SemanticException e) { - LOG.warn("Exception testing aggregators.",e); - result = false; + LOG.warn("Exception testing aggregators.", e); + result = false; } return result; } @@ -291,38 +285,35 @@ private boolean checkAggregators(Collection aggs) { Map opRules = new LinkedHashMap(); opRules.put(new RuleRegExp("No scripts", ScriptOperator.getOperatorName() + "%"), new NodeProcessor() { - public Object process(Node n, Stack s, NodeProcessorCtx c, - Object... os) { - return new Boolean(false); - } - }); - opRules.put(new RuleRegExp("No user code in fil", - FilterOperator.getOperatorName() + "%"), + @Override + public Object process(Node n, Stack s, NodeProcessorCtx c, Object... os) { + return new Boolean(false); + } + }); + opRules.put(new RuleRegExp("No user code in fil", FilterOperator.getOperatorName() + "%"), new NodeProcessor() { - public Object process(Node n, Stack s, NodeProcessorCtx c, - Object... os) { - ExprNodeDesc expr = ((FilterOperator)n).getConf().getPredicate(); - return new Boolean(checkExpression(expr)); - } - }); - opRules.put(new RuleRegExp("No user code in gby", - GroupByOperator.getOperatorName() + "%"), + @Override + public Object process(Node n, Stack s, NodeProcessorCtx c, Object... os) { + ExprNodeDesc expr = ((FilterOperator) n).getConf().getPredicate(); + return new Boolean(checkExpression(expr)); + } + }); + opRules.put(new RuleRegExp("No user code in gby", GroupByOperator.getOperatorName() + "%"), new NodeProcessor() { - public Object process(Node n, Stack s, NodeProcessorCtx c, - Object... os) { - List aggs = ((GroupByOperator)n).getConf().getAggregators(); - return new Boolean(checkAggregators(aggs)); - } - }); - opRules.put(new RuleRegExp("No user code in select", - SelectOperator.getOperatorName() + "%"), + @Override + public Object process(Node n, Stack s, NodeProcessorCtx c, Object... os) { + List aggs = ((GroupByOperator) n).getConf().getAggregators(); + return new Boolean(checkAggregators(aggs)); + } + }); + opRules.put(new RuleRegExp("No user code in select", SelectOperator.getOperatorName() + "%"), new NodeProcessor() { - public Object process(Node n, Stack s, NodeProcessorCtx c, - Object... os) { - List exprs = ((SelectOperator)n).getConf().getColList(); - return new Boolean(checkExpressions(exprs)); - } - }); + @Override + public Object process(Node n, Stack s, NodeProcessorCtx c, Object... os) { + List exprs = ((SelectOperator) n).getConf().getColList(); + return new Boolean(checkExpressions(exprs)); + } + }); return opRules; } @@ -362,8 +353,7 @@ private boolean checkParentsInLlap(TezWork tezWork, BaseWork base) { private boolean checkInputsVectorized(MapWork mapWork) { for (String path : mapWork.getPathToPartitionInfo().keySet()) { PartitionDesc pd = mapWork.getPathToPartitionInfo().get(path); - List> interfaceList = - Arrays.asList(pd.getInputFileFormatClass().getInterfaces()); + List> interfaceList = Arrays.asList(pd.getInputFileFormatClass().getInterfaces()); if (!interfaceList.contains(VectorizedInputFormatInterface.class)) { LOG.info("Input format: " + pd.getInputFileFormatClassName() + ", doesn't provide vectorized input"); @@ -412,7 +402,6 @@ private long computeOutputSize(BaseWork base) { @Override public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { - this.physicalContext = pctx; this.conf = pctx.getConf(); this.mode = LlapMode.valueOf(HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_EXECUTION_MODE)); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java new file mode 100644 index 0000000..7669037 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java @@ -0,0 +1,303 @@ +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Set; +import java.util.Stack; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.tez.DagUtils; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; +import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.yarn.api.records.Resource; + +public class MemoryDecider implements PhysicalPlanResolver { + + public class MemoryCalculator implements Dispatcher { + + private final HiveConf conf; + private final Resource resourceAvailable; + private long inputOutputBufferLimit = 0; + private long parallelism = 0; + private final long onePercentMemory; + private long remainingMemory; + private long totalAvailableMemory; + private final PhysicalContext pctx; + private long incomingSize; + private long outgoingSize; + private static final long RESOURCE_LIMIT = 50; // buffer/operator resources limited to 50% + private static final long DEFAULT_RESOURCE_SIZE = 10; + + public MemoryCalculator(PhysicalContext pctx) { + this.pctx = pctx; + this.conf = pctx.conf; + this.resourceAvailable = DagUtils.getContainerResource(conf); + this.totalAvailableMemory = resourceAvailable.getMemory() * 1024 * 1024; + this.onePercentMemory = totalAvailableMemory / 100; + } + + @Override + public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) + throws SemanticException { + @SuppressWarnings("unchecked") + Task currTask = (Task) nd; + if (currTask instanceof TezTask) { + TezWork work = ((TezTask) currTask).getWork(); + for (BaseWork w : work.getAllWork()) { + evaluateWork(work, w); + } + } + return null; + } + + private void evaluateWork(TezWork work, BaseWork w) throws SemanticException { + + if (w instanceof MapWork) { + evaluateMapWork(work, (MapWork) w); + } else if (w instanceof ReduceWork) { + evaluateReduceWork(work, (ReduceWork) w); + } else if (w instanceof MergeJoinWork) { + evaluateMergeWork(work, (MergeJoinWork) w); + } else { + throw new SemanticException("Unknown work type: " + w); + } + } + + // FIXME + // this needs to account for the other work items embedded within the merge join work + private void evaluateMergeWork(TezWork work, MergeJoinWork w) throws SemanticException { + long totalAvailableMemory = this.totalAvailableMemory; + this.totalAvailableMemory = totalAvailableMemory / w.getBaseWorkList().size(); + evaluateWork(work, w.getMainWork()); + for (BaseWork baseWork : w.getBaseWorkList()) { + evaluateWork(work, baseWork); + } + } + + private void evaluateReduceWork(TezWork work, ReduceWork w) throws SemanticException { + parallelism = w.getNumReduceTasks(); + if (parallelism <= 0) { + parallelism = 1; + } + long totalIncomingSize = getInputMemoryRequired(work, w); + + long totalOutgoingSize = getOutputMemoryRequired(work, w); + + computeMemoryForOperators(work, w, totalIncomingSize, totalOutgoingSize); + } + + private void evaluateMapWork(TezWork work, MapWork w) throws SemanticException { + long maxSplitSize = HiveConf.getLongVar(conf, HiveConf.ConfVars.MAPREDMAXSPLITSIZE); + parallelism = w.getMemoryNeeded() / maxSplitSize; + if (parallelism <= 0) { + parallelism = 1; + } + long totalIncomingSize = getInputMemoryRequired(work, w); + + long totalOutgoingSize = getOutputMemoryRequired(work, w); + + computeMemoryForOperators(work, w, totalIncomingSize, totalOutgoingSize); + + recomputeMemoryForInputOutput(work, w, incomingSize, outgoingSize); + } + + private void recomputeMemoryForInputOutput(TezWork work, MapWork w, long totalIncomingSize, + long totalOutgoingSize) { + List parentWorkList = work.getParents(w); + for (BaseWork parentWork : parentWorkList) { + TezEdgeProperty edgeProp = work.getEdgeProperty(parentWork, w); + edgeProp.setInputMemoryNeededFraction(parentWork.getMemoryNeeded() / totalIncomingSize); + } + + List childWorkList = work.getChildren(w); + if (childWorkList.isEmpty()) { + return; + } + for (BaseWork childWork : childWorkList) { + TezEdgeProperty edgeProp = work.getEdgeProperty(w, childWork); + // one size fits all? + edgeProp.setOutputMemoryNeeded(outgoingSize / childWorkList.size()); + } + } + + private long getInputMemoryRequired(TezWork work, BaseWork w) { + long totalIncomingSize = 0L; + List parentWorkList = work.getParents(w); + if (parentWorkList.isEmpty()) { + totalIncomingSize = onePercentMemory * DEFAULT_RESOURCE_SIZE; + return totalIncomingSize; + } + for (BaseWork parentWork : parentWorkList) { + TezEdgeProperty edgeProp = work.getEdgeProperty(parentWork, w); + if (edgeProp.getEdgeType() == EdgeType.SIMPLE_EDGE) { + // we need buffer space for incoming shuffled data + totalIncomingSize += w.getMemoryNeeded() / parallelism; + } else { + totalIncomingSize += onePercentMemory; + } + } + return totalIncomingSize; + } + + private long getOutputMemoryRequired(TezWork work, BaseWork w) { + // FIXME + // 10% per input? + long totalOutgoingSize = onePercentMemory * DEFAULT_RESOURCE_SIZE; // default 10% + List childWorkList = work.getChildren(w); + for (BaseWork childWork : childWorkList) { + TezEdgeProperty edgeProp = work.getEdgeProperty(w, childWork); + EdgeType edgeType = edgeProp.getEdgeType(); + // we need estimate for only one outgoing edge because tez does replication of output. + // this breaks down if there is a case of different type of edges downstream - say + // sorted and unsorted edges. If we look at only the unsorted type of edge, we may end up + // underestimating the amount of memory required. + if (edgeType == EdgeType.SIMPLE_EDGE) { + totalOutgoingSize = edgeProp.getEstimatedTransferBytes(); + break; + } + } + + return totalOutgoingSize; + } + + private void computeMemoryForOperators(TezWork work, BaseWork w, long totalIncomingSize, + long totalOutgoingSize) + throws SemanticException { + + // if the buffers require > 50% of memory, lets now limit the percent to 50% of memory. + // if the operator pipeline has excess, this value can grow + boolean capped = false; + incomingSize = totalIncomingSize; + outgoingSize = totalOutgoingSize; + if ((totalIncomingSize + totalOutgoingSize) > (totalAvailableMemory / 2)) { + // capped to 50% till we see the operator pipeline + capped = true; + this.inputOutputBufferLimit = onePercentMemory * RESOURCE_LIMIT; + incomingSize = + totalIncomingSize * inputOutputBufferLimit / (totalIncomingSize + totalOutgoingSize); + outgoingSize = inputOutputBufferLimit - incomingSize; + } else { + // it needs amount of memory less than or equal to 50% + this.inputOutputBufferLimit = totalIncomingSize + totalOutgoingSize; + } + + this.remainingMemory = totalAvailableMemory - (this.inputOutputBufferLimit); + // fill in the operator memory requirements and find total + evaluateOperators(w, pctx); + if ((remainingMemory > 0) && (capped)) { + // operator tree had excess memory. We can use the excess for the inputs/outputs + long incomingIncrease = + totalIncomingSize * remainingMemory / (totalIncomingSize + totalOutgoingSize); + incomingSize += incomingIncrease; + outgoingSize += (remainingMemory - incomingIncrease); + } + } + + private long evaluateOperators(BaseWork w, PhysicalContext pctx) throws SemanticException { + // lets take a look at the operator memory requirements. + Dispatcher disp = + new DefaultRuleDispatcher(new DefaultRule(), new HashMap(), + null); + GraphWalker ogw = new DefaultGraphWalker(disp); + + ArrayList topNodes = new ArrayList(); + topNodes.addAll(w.getAllRootOperators()); + + LinkedHashMap nodeOutput = new LinkedHashMap(); + ogw.startWalking(topNodes, nodeOutput); + + return computeMemoryRequirements(nodeOutput.keySet()); + } + + private long computeMemoryRequirements(Set keySet) { + long retval = 0; + List> opList = new ArrayList>(); + long minMemory = + (keySet.size() > 100) ? totalAvailableMemory / keySet.size() : onePercentMemory; + long totalMemoryNeeded = 0; + for (Node nd : keySet) { + Operator op = (Operator) nd; + long memoryNeeded = op.getConf().getMemoryNeeded(); + if (memoryNeeded == 0) { + memoryNeeded = minMemory; + } else { + memoryNeeded += minMemory; + opList.add(op); + } + + totalMemoryNeeded += memoryNeeded; + op.getConf().setMemoryNeeded(memoryNeeded); + } + + if (totalMemoryNeeded > remainingMemory) { + long minMemoryRequired = keySet.size() * minMemory; + remainingMemory -= minMemoryRequired; + totalMemoryNeeded -= minMemoryRequired; + for (Operator op : opList) { + long memNeeded = (op.getConf().getMemoryNeeded() * remainingMemory) / totalMemoryNeeded; + op.getConf().setMemoryNeeded(memNeeded); + retval += memNeeded; + } + remainingMemory -= retval; + retval += minMemoryRequired; + if (remainingMemory < 0) { + throw new IllegalStateException("Remaining memory cannot be negative"); + } + } else { + retval = totalMemoryNeeded; + remainingMemory -= totalMemoryNeeded; + } + return retval; + } + + public class DefaultRule implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + // do nothing for all operators + return null; + } + } + } + + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + + pctx.getConf(); + + // create dispatcher and graph walker + Dispatcher disp = new MemoryCalculator(pctx); + TaskGraphWalker ogw = new TaskGraphWalker(disp); + + // get all the tasks nodes from root task + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getRootTasks()); + + // begin to walk through the task tree. + ogw.startWalking(topNodes, null); + return pctx; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 4dcdf91..9252151 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -146,9 +146,12 @@ public ReduceWork createReduceWork(GenTezProcContext context, Operator root, if (reduceWork.isAutoReduceParallelism()) { edgeProp = new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true, - reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), bytesPerReducer); + reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), bytesPerReducer, + reduceSink.getConf().getStatistics().getDataSize()); } else { - edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + edgeProp = + new TezEdgeProperty(EdgeType.SIMPLE_EDGE, reduceSink.getConf().getStatistics() + .getDataSize()); } tezWork.connect( diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 0990894..19e054a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -135,6 +135,10 @@ public Object process(Node nd, Stack stack, context.rootToWorkMap.put(root, work); } + if (operator.getStatistics() != null) { + work.setMemoryNeeded(operator.getStatistics().getDataSize()); + } + // this is where we set the sort columns that we will be using for KeyValueInputMerge if (operator instanceof DummyStoreOperator) { work.addSortCols(root.getOpTraits().getSortCols().get(0)); @@ -294,7 +298,7 @@ public Object process(Node nd, Stack stack, // finally hook everything up LOG.debug("Connecting union work ("+unionWork+") with work ("+work+")"); - TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.CONTAINS); + TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.CONTAINS, 0); tezWork.connect(unionWork, work, edgeProp); unionWork.addUnionOperators(context.currentUnionOperators); context.currentUnionOperators.clear(); @@ -376,9 +380,12 @@ public Object process(Node nd, Stack stack, if (rWork.isAutoReduceParallelism()) { edgeProp = new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true, - rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer); + rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer, rs + .getConf().getStatistics().getDataSize()); } else { - edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + edgeProp = + new TezEdgeProperty(EdgeType.SIMPLE_EDGE, rs.getConf().getStatistics() + .getDataSize()); } tezWork.connect(work, followingWork, edgeProp); context.connectedReduceSinks.add(rs); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index e37873d..3c6d963 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits; import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck; import org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider; +import org.apache.hadoop.hive.ql.optimizer.physical.MemoryDecider; import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; @@ -482,6 +483,10 @@ protected void optimizeTaskPlan(List> rootTasks, Pa } else { LOG.debug("Skipping llap decider"); } + + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER)) { + physicalCtx = new MemoryDecider().resolve(physicalCtx); + } return; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java index 0a83440..deead91 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java @@ -30,6 +30,7 @@ protected transient Statistics statistics; protected transient OpTraits opTraits; protected transient Map opProps; + protected long memNeeded = 0; static { PTFUtils.makeTransient(AbstractOperatorDesc.class, "opProps"); @@ -59,14 +60,17 @@ public void setVectorMode(boolean vm) { this.vectorMode = vm; } + @Override public OpTraits getTraits() { return opTraits; } + @Override public void setTraits(OpTraits opTraits) { this.opTraits = opTraits; } + @Override public Map getOpProps() { return opProps; } @@ -74,4 +78,14 @@ public void setTraits(OpTraits opTraits) { public void setOpProps(Map props) { this.opProps = props; } + + @Override + public long getMemoryNeeded() { + return memNeeded; + } + + @Override + public void setMemoryNeeded(long memNeeded) { + this.memNeeded = memNeeded; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java index 0f2855e..662e230 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java @@ -227,7 +227,7 @@ public boolean isAggregate() { } return false; } - + @Explain(displayName = "bucketGroup", displayOnlyOnTrue = true) public boolean getBucketGroup() { return bucketGroup; @@ -301,4 +301,12 @@ public void setDistinct(boolean isDistinct) { this.isDistinct = isDistinct; } + @Override + public long getMemoryNeeded() { + if (statistics != null) { + return statistics.getDataSize(); + } else { + return memNeeded; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java index fb4d3b4..16be499 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java @@ -28,4 +28,6 @@ public OpTraits getTraits(); public void setTraits(OpTraits opTraits); public Map getOpProps(); + public long getMemoryNeeded(); + public void setMemoryNeeded(long memoryNeeded); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index 0eb4ab6..ef59f82 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -92,7 +92,7 @@ private float topNMemoryUsage = -1; private boolean mapGroupBy; // for group-by, values with same key on top-K should be forwarded //flag used to control how TopN handled for PTF/Windowing partitions. - private boolean isPTFReduceSink = false; + private boolean isPTFReduceSink = false; private boolean skipTag; // Skip writing tags when feeding into mapjoin hashtable public static enum ReducerTraits { @@ -413,7 +413,7 @@ public final void setReducerTraits(EnumSet traits) { // reducers or hash function. boolean wasUnset = this.reduceTraits.remove(ReducerTraits.UNSET); - + if (this.reduceTraits.contains(ReducerTraits.FIXED)) { return; } else if (traits.contains(ReducerTraits.FIXED)) { @@ -437,4 +437,13 @@ public boolean isEnforceSort() { public void setEnforceSort(boolean isDeduplicated) { this.enforceSort = isDeduplicated; } + + @Override + public long getMemoryNeeded() { + if (statistics != null) { + return statistics.getDataSize(); + } else { + return memNeeded; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java index a3aa12f..4e141ce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java @@ -30,33 +30,44 @@ CUSTOM_SIMPLE_EDGE, } - private HiveConf hiveConf; - private EdgeType edgeType; - private int numBuckets; + private final HiveConf hiveConf; + private final EdgeType edgeType; + private final int numBuckets; private boolean isAutoReduce; private int minReducer; private int maxReducer; private long inputSizePerReducer; + private final long estimatedTransferBytes; + private float inputMemoryNeededFraction; + private long outputMemoryNeeded; - public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, - int buckets) { + public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, int buckets, + long estimatedTransferBytes) { this.hiveConf = hiveConf; this.edgeType = edgeType; this.numBuckets = buckets; + this.estimatedTransferBytes = estimatedTransferBytes; + this.setInputMemoryNeededFraction(0.0f); + this.setOutputMemoryNeeded(0); } public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, boolean isAutoReduce, - int minReducer, int maxReducer, long bytesPerReducer) { - this(hiveConf, edgeType, -1); + int minReducer, int maxReducer, long bytesPerReducer, long estimatedTransferBytes) { + this(hiveConf, edgeType, -1, estimatedTransferBytes); this.minReducer = minReducer; this.maxReducer = maxReducer; this.isAutoReduce = isAutoReduce; this.inputSizePerReducer = bytesPerReducer; } + public TezEdgeProperty(EdgeType edgeType, long estimatedTransferBytes) { + this(null, edgeType, -1, estimatedTransferBytes); + } + + // called by test code only. public TezEdgeProperty(EdgeType edgeType) { - this(null, edgeType, -1); + this(edgeType, 0); } public EdgeType getEdgeType() { @@ -86,4 +97,27 @@ public int getMaxReducer() { public long getInputSizePerReducer() { return inputSizePerReducer; } + + public long getEstimatedTransferBytes() { + return estimatedTransferBytes; + } + + public void setInputMemoryNeededFraction(float inputMemoryNeededFraction) { + if (inputMemoryNeededFraction < 0 || inputMemoryNeededFraction > 1) { + throw new IllegalStateException("Provided an invalid fraction for memory needed."); + } + this.inputMemoryNeededFraction = inputMemoryNeededFraction; + } + + public float getInputMemoryNeededFraction() { + return inputMemoryNeededFraction; + } + + public long getOutputMemoryNeeded() { + return outputMemoryNeeded; + } + + public void setOutputMemoryNeeded(long outputMemoryNeeded) { + this.outputMemoryNeeded = outputMemoryNeeded; + } } diff --git ql/src/test/queries/clientpositive/tez_smb_1.q ql/src/test/queries/clientpositive/tez_smb_1.q index 266a81b..294a65e 100644 --- ql/src/test/queries/clientpositive/tez_smb_1.q +++ ql/src/test/queries/clientpositive/tez_smb_1.q @@ -1,4 +1,5 @@ set hive.auto.convert.join=true; +set hive.tez.enable.memory.manager=true; set hive.auto.convert.join.noconditionaltask=true; set hive.auto.convert.join.noconditionaltask.size=10000; set hive.auto.convert.sortmerge.join.bigtable.selection.policy = org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ;