diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 049e83713e..089b88cdb0 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4053,6 +4053,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal LLAP_ALLOCATOR_DEFRAG_HEADROOM("hive.llap.io.allocator.defrag.headroom", "1Mb", "How much of a headroom to leave to allow allocator more flexibility to defragment.\n" + "The allocator would further cap it to a fraction of total memory."), + LLAP_ALLOCATOR_MAX_FORCE_EVICTED("hive.llap.io.allocator.max.force.eviction", "16Mb", + "Fragmentation can lead to some cases where more eviction has to happen to accommodate allocations\n" + + " This configuration puts a limit on how many bytes to force evict before using Allocator Discard method." + + " Higher values will allow allocator more flexibility and will lead to better caching."), LLAP_TRACK_CACHE_USAGE("hive.llap.io.track.cache.usage", true, "Whether to tag LLAP cache contents, mapping them to Hive entities (paths for\n" + "partitions and tables) for reporting."), diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java index 60d6edfdcb..9e7a8dce64 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java @@ -48,8 +48,28 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator; +/** + * + * High level description, functionality and the memory layout. + * Allocation can be of general size but it will be rounded up to the next power of 2. + * Allocation smaller than size {@link ConfVars#LLAP_ALLOCATOR_MIN_ALLOC} will be rounded up to min allocation size. + * Allocation bigger than size {@link ConfVars#LLAP_ALLOCATOR_MAX_ALLOC} will throw exception. + * Allocator slices memory slab called {@code Arena} to carve out byte buffers using slice and position. + * + * Each {@code Arena} has array of {@code freeList} used for concurrency management and to index free buffers by size. + * | + * Each {@code Arena} has a max Size {@link BuddyAllocator#MAX_ARENA_SIZE} 1GB. + * \_ + * Each arena is divided into chunks of max allocation size {@link ConfVars#LLAP_ALLOCATOR_MAX_ALLOC} default 16MB + * \_ + * Each chunk of {@code maxAllocationSize} is sliced using the classical Buddy Allocator algorithm. + * \_ + * Each Buddy Allocator tree can be split up to chunks of size {@link ConfVars#LLAP_ALLOCATOR_MIN_ALLOC} 4KB + */ public final class BuddyAllocator - implements EvictionAwareAllocator, StoppableAllocator, BuddyAllocatorMXBean, LlapIoDebugDump { + implements EvictionAwareAllocator, StoppableAllocator, BuddyAllocatorMXBean, LlapIoDebugDump { + private static final String FAILED_TO_ALLOCATE_MSG = + "Failed to allocate [{}]X[{}] bytes after [{}] attempt, evicted [{}] bytes and partially allocated [{}] bytes"; private final Arena[] arenas; private final AtomicInteger allocatedArenas = new AtomicInteger(0); @@ -65,6 +85,7 @@ private final long maxSize; private final boolean isDirect; private final boolean isMapped; + private final int maxForcedEvictionSize; private final Path cacheDir; // These are only used for tests. @@ -72,12 +93,23 @@ // We don't know the acceptable size for Java array, so we'll use 1Gb boundary. // That is guaranteed to fit any maximum allocation. - private static final int MAX_ARENA_SIZE = 1024*1024*1024; + private static final int MAX_ARENA_SIZE = 1024 * 1024 * 1024; // Don't try to operate with less than MIN_SIZE allocator space, it will just give you grief. - private static final int MIN_TOTAL_MEMORY_SIZE = 64*1024*1024; + private static final int MIN_TOTAL_MEMORY_SIZE = 64 * 1024 * 1024; // Maximum reasonable defragmentation headroom. Mostly kicks in on very small caches. private static final float MAX_DEFRAG_HEADROOM_FRACTION = 0.01f; - + /** + * Max number of attempts to try allocations before giving up and move to forced evictions or/and freelist/brute + * force discards, + * Note to reader: why 5, i don't know, i run with 2 and 5 seems providing same results, thus the 5, better try more + * than start evictions, Also I am sure any value less than one will hurt. + */ + private static final int MAX_FAST_ATTEMPT = 5; + /** + * Default max amount of bytes to force evict on failed allocation attempts 16MB. used for testing, for production + * please adjust {@link ConfVars.LLAP_ALLOCATOR_MAX_FORCE_EVICTED} + */ + private static final int MAX_FORCED_EVICTION_SIZE = 1024 * 1024 * 16; private static final FileAttribute> RWX = PosixFilePermissions .asFileAttribute(PosixFilePermissions.fromString("rwx------")); private final AtomicLong[] defragCounters; @@ -86,24 +118,22 @@ // Discard context that is cached for reuse per thread to avoid allocating lots of arrays, // and then resizing them down the line if we need a bigger size. // Only the IO threads need this, so there'd be at most few dozen objects. - private final static ThreadLocal threadCtx = new ThreadLocal() { - protected DiscardContext initialValue() { - return new DiscardContext(); - }; - }; + private final static ThreadLocal threadCtx = ThreadLocal.withInitial(DiscardContext::new); public BuddyAllocator(Configuration conf, MemoryManager mm, LlapDaemonCacheMetrics metrics) { this(HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_DIRECT), HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_MAPPED), - (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC), - (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MAX_ALLOC), + (int) HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC), + (int) HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MAX_ALLOC), HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_ARENA_COUNT), getMaxTotalMemorySize(conf), HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_DEFRAG_HEADROOM), HiveConf.getVar(conf, ConfVars.LLAP_ALLOCATOR_MAPPED_PATH), - mm, metrics, HiveConf.getVar(conf, ConfVars.LLAP_ALLOCATOR_DISCARD_METHOD), - HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_PREALLOCATE) - ); + mm, + metrics, + HiveConf.getVar(conf, ConfVars.LLAP_ALLOCATOR_DISCARD_METHOD), + HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_PREALLOCATE), + (int) HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MAX_FORCE_EVICTED)); } private static boolean areAssertsEnabled() { @@ -123,11 +153,37 @@ private static long getMaxTotalMemorySize(Configuration conf) { + "disable LLAP IO entirely via " + ConfVars.LLAP_IO_ENABLED.varname); } + @VisibleForTesting public BuddyAllocator(boolean isDirectVal, + boolean isMappedVal, + int minAllocVal, + int maxAllocVal, + int arenaCount, + long maxSizeVal, + long defragHeadroom, + String mapPath, + MemoryManager memoryManager, + LlapDaemonCacheMetrics metrics, + String discardMethod, + boolean doPreallocate) { + this(isDirectVal, + isMappedVal, + minAllocVal, + maxAllocVal, + arenaCount, + maxSizeVal, + defragHeadroom, + mapPath, + memoryManager, + metrics, + discardMethod, + doPreallocate, + MAX_FORCED_EVICTION_SIZE); + } @VisibleForTesting public BuddyAllocator(boolean isDirectVal, boolean isMappedVal, int minAllocVal, int maxAllocVal, int arenaCount, long maxSizeVal, long defragHeadroom, String mapPath, MemoryManager memoryManager, LlapDaemonCacheMetrics metrics, String discardMethod, - boolean doPreallocate) { + boolean doPreallocate, int maxForcedEvictionSize) { isDirect = isDirectVal; isMapped = isMappedVal; minAllocation = minAllocVal; @@ -174,6 +230,7 @@ public BuddyAllocator(boolean isDirectVal, boolean isMappedVal, int minAllocVal, boolean isBoth = null == discardMethod || "both".equalsIgnoreCase(discardMethod); doUseFreeListDiscard = isBoth || "freelist".equalsIgnoreCase(discardMethod); doUseBruteDiscard = isBoth || "brute".equalsIgnoreCase(discardMethod); + this.maxForcedEvictionSize = maxForcedEvictionSize; } public long determineMaxMmSize(long defragHeadroom, long maxMmSize) { @@ -251,37 +308,34 @@ public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory @Override public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory, AtomicBoolean isStopped) throws AllocatorOutOfMemoryException { - assert size > 0 : "size is " + size; - if (size > maxAllocation) { - throw new RuntimeException("Trying to allocate " + size + "; max is " + maxAllocation); - } + checkAllocationSize(size); int freeListIx = determineFreeListForAllocation(size); int allocLog2 = freeListIx + minAllocLog2; int allocationSize = 1 << allocLog2; // If using async, we could also reserve one by one. memoryManager.reserveMemory(dest.length << allocLog2, isStopped); - for (int i = 0; i < dest.length; ++i) { - if (dest[i] != null) continue; - // Note: this is backward compat only. Should be removed with createUnallocated. - dest[i] = factory != null ? factory.create() : createUnallocated(); - } - - // First try to quickly lock some of the correct-sized free lists and allocate from them. - int arenaCount = allocatedArenas.get(); - if (arenaCount < 0) { - arenaCount = -arenaCount - 1; // Next arena is being allocated. - } + initMemoryBuffers(dest, factory); + int arenaCount = getArenaCount(); // Note: we might want to be smarter if threadId-s are low and there more arenas than threads. long threadId = arenaCount > 1 ? Thread.currentThread().getId() : 0; int destAllocIx = allocateFast(dest, null, 0, dest.length, freeListIx, allocationSize, (int)(threadId % arenaCount), arenaCount); - if (destAllocIx == dest.length) return; + if (destAllocIx == dest.length) { + return; + } + // Try to allocate memory if we haven't allocated all the way to maxSize yet; very rare. + destAllocIx = allocateWithExpand(dest, destAllocIx, freeListIx, allocationSize, arenaCount); + if (destAllocIx == dest.length) { + return; + } // We called reserveMemory so we know that there's memory waiting for us somewhere. - // However, we have a class of rare race conditions related to the order of locking/checking of - // different allocation areas. Simple case - say we have 2 arenas, 256Kb available in arena 2. + // But that can mean that the reserved memory is fragmented thus unusable + // or we have a common class of rare race conditions related to the order of locking/checking of + // different allocation areas this is due to the fact that Allocations arrive in a burst fashion. + // Simple case - say we have 2 arenas, 256Kb available in arena 2. // We look at arena 1; someone deallocs 256Kb from arena 1 and allocs the same from arena 2; // we look at arena 2 and find no memory. Or, for single arena, 2 threads reserve 256k each, // and a single 1Mb block is available. When the 1st thread locks the 1Mb freelist, the 2nd one @@ -289,30 +343,66 @@ public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory // into smaller lists after its split is done will not be found by (2); given that freelist // locks don't overlap, (2) may even run completely between the time (1) takes out the 1Mb // block and the time it returns the remaining 768Kb. - // Two solutions to this are some form of cross-thread helping (threads putting "demand" - // into some sort of queues that deallocate and split will examine), or having and "actor" - // allocator thread (or threads per arena). - // The 2nd one is probably much simpler and will allow us to get rid of a lot of sync code. - // But for now we will just retry. We will evict more each time. - int attempt = 0; + // To mitigate that as of now: + // - First we try again (fast path): + // - Second try to push on the memoryManger/evictor to evict more. + // - Third bypass the memory manager and use discards method to brute force evict if needed. + int allocationAttempt = 0; + long totalForceEvictedBytes = 0; + long currentEvicted; + int startArenaIx; + int emptyAttempt = 0; + while (totalForceEvictedBytes < maxForcedEvictionSize && emptyAttempt < MAX_DISCARD_ATTEMPTS) { + startArenaIx = (int) ((threadId + allocationAttempt) % arenaCount); + if (allocationAttempt >= MAX_FAST_ATTEMPT) { + // Try to evict more starting from dest.length - destAllocIx << allocLog2, exponentially increase the eviction + // request based on forceEvictAttempt counter. Why Exponentially increase, usually query fragments triggers a + // burst of allocation at the same time, IMO it is worth to over evict thus next allocation will get free lunch. + currentEvicted = + memoryManager.evictMemory(dest.length - destAllocIx + allocationAttempt - MAX_FAST_ATTEMPT << allocLog2); + // Note that memoryManager.evictMemory might evict 0 thus bail out after MAX_DISCARD_ATTEMPTS + totalForceEvictedBytes += currentEvicted; + emptyAttempt += currentEvicted == 0 ? 1 : 0; + } + destAllocIx = + allocateFast(dest, null, destAllocIx, dest.length, freeListIx, allocationSize, startArenaIx, arenaCount); + if (destAllocIx == dest.length) { + return; + } + destAllocIx = + allocateWithSplit(dest, + null, + destAllocIx, + dest.length, + freeListIx, + allocationSize, + startArenaIx, + arenaCount, + -1); + if (destAllocIx == dest.length) { + return; + } + + allocationAttempt++; + } + + LlapIoImpl.LOG.warn(FAILED_TO_ALLOCATE_MSG, + dest.length, + allocationSize, + allocationAttempt, + totalForceEvictedBytes, + destAllocIx << allocLog2); + int discardsAttempt = 0; boolean isFailed = false; int memoryForceReleased = 0; try { int discardFailed = 0; while (true) { // Try to split bigger blocks. - int startArenaIx = (int)((threadId + attempt) % arenaCount); + startArenaIx = (int)((threadId + discardsAttempt + allocationAttempt) % arenaCount); destAllocIx = allocateWithSplit(dest, null, destAllocIx, dest.length, freeListIx, allocationSize, startArenaIx, arenaCount, -1); if (destAllocIx == dest.length) return; - - if (attempt == 0) { - // Try to allocate memory if we haven't allocated all the way to maxSize yet; very rare. - destAllocIx = allocateWithExpand( - dest, destAllocIx, freeListIx, allocationSize, arenaCount); - if (destAllocIx == dest.length) return; - } - // Try to force-evict the fragments of the requisite size. boolean hasDiscardedAny = false; DiscardContext ctx = threadCtx.get(); @@ -327,7 +417,9 @@ public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory hasDiscardedAny = ctx.resultCount > 0; destAllocIx = allocateFromDiscardResult( dest, destAllocIx, freeListIx, allocationSize, ctx); - if (destAllocIx == dest.length) return; + if (destAllocIx == dest.length) { + return; + } } // Then, try the brute force search for something to throw away. if (doUseBruteDiscard) { @@ -337,7 +429,9 @@ public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory hasDiscardedAny = hasDiscardedAny || (ctx.resultCount > 0); destAllocIx = allocateFromDiscardResult( dest, destAllocIx, freeListIx, allocationSize, ctx); - if (destAllocIx == dest.length) return; + if (destAllocIx == dest.length) { + return; + } } if (hasDiscardedAny) { @@ -352,22 +446,47 @@ public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory LlapIoImpl.LOG.info("Failed to deallocate after a partially successful allocate: " + dest[i]); } } + //Need to release the un-allocated but reserved memory. + memoryManager.releaseMemory(dest.length - destAllocIx << allocLog2); String msg = "Failed to allocate " + size + "; at " + destAllocIx + " out of " + dest.length + " (entire cache is fragmented and locked, or an internal issue)"; logOomErrorMessage(msg); throw new AllocatorOutOfMemoryException(msg); } - ++attempt; + ++discardsAttempt; } } finally { memoryManager.releaseMemory(memoryForceReleased); - if (!isFailed && attempt >= LOG_DISCARD_ATTEMPTS) { + if (!isFailed && discardsAttempt >= LOG_DISCARD_ATTEMPTS) { LlapIoImpl.LOG.info("Allocation of " + dest.length + " buffers of size " + size + " took " - + attempt + " attempts to free enough memory; force-released " + memoryForceReleased); + + discardsAttempt + " attempts to free enough memory; force-released " + memoryForceReleased); } } } + private int getArenaCount() { + int arenaCount = allocatedArenas.get(); + if (arenaCount < 0) { + arenaCount = -arenaCount - 1; // Next arena is being allocated. + } + return arenaCount; + } + + private void initMemoryBuffers(MemoryBuffer[] dest, BufferObjectFactory factory) { + for (int i = 0; i < dest.length; ++i) { + if (dest[i] != null) continue; + // Note: this is backward compat only. Should be removed with createUnallocated. + dest[i] = factory != null ? factory.create() : createUnallocated(); + } + } + + private void checkAllocationSize(int size) { + assert size > 0 : "size is " + size; + if (size > maxAllocation) { + throw new RuntimeException("Trying to allocate " + size + "; max is " + maxAllocation); + } + } + /** The context for the forced eviction of buffers. */ private static final class DiscardContext { long[] results; @@ -738,8 +857,7 @@ private ByteBuffer preallocateArenaBuffer(int arenaSize) { rwf.setLength(arenaSize); // truncate (TODO: posix_fallocate?) // Use RW, not PRIVATE because the copy-on-write is irrelevant for a deleted file // see discussion in YARN-5551 for the memory accounting discussion - ByteBuffer rwbuf = rwf.getChannel().map(MapMode.READ_WRITE, 0, arenaSize); - return rwbuf; + return rwf.getChannel().map(MapMode.READ_WRITE, 0, arenaSize); } catch (IOException ioe) { LlapIoImpl.LOG.warn("Failed trying to allocate memory mapped arena", ioe); // fail similarly when memory allocations fail @@ -1278,7 +1396,6 @@ private int allocateWithSplit(int freeListIx, MemoryBuffer[] dest, } lastSplitBlocksRemaining >>>= 1; ++newListIndex; - continue; } } ++splitListIx; @@ -1774,10 +1891,10 @@ public synchronized void dumpLog(boolean doSleep) { if (doSleep) { try { Thread.sleep(100); - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { } } - int logSize = (int)offset.get(); + int logSize = offset.get(); int ix = 0; while (ix < logSize) { ix = dumpOneLine(ix); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocatorMXBean.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocatorMXBean.java index 5e3cc50c1b..28a954a3c0 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocatorMXBean.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocatorMXBean.java @@ -22,41 +22,41 @@ /** * MXbean to expose cache allocator related information through JMX. */ -@MXBean -public interface BuddyAllocatorMXBean { +@MXBean public interface BuddyAllocatorMXBean { /** * Gets if bytebuffers are allocated directly offheap. * * @return gets if direct bytebuffer allocation */ - public boolean getIsDirect(); + boolean getIsDirect(); /** * Gets minimum allocation size of allocator. * * @return minimum allocation size */ - public int getMinAllocation(); + int getMinAllocation(); /** * Gets maximum allocation size of allocator. * * @return maximum allocation size */ - public int getMaxAllocation(); + int getMaxAllocation(); /** * Gets the arena size. * * @return arena size */ - public int getArenaSize(); + int getArenaSize(); /** * Gets the maximum cache size. * * @return max cache size */ - public long getMaxCacheSize(); -} \ No newline at end of file + long getMaxCacheSize(); + +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java index c5b5bf2977..689a5d5c2b 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java @@ -66,6 +66,15 @@ public void reserveMemory(final long memoryToReserve, AtomicBoolean isStopped) { throw new ReserveFailedException(isStopped); } + @Override public long evictMemory(long memoryToEvict) { + if (evictor == null) { + return 0; + } + long evicted = evictor.evictSomeBlocks(memoryToEvict); + releaseMemory(evicted); + return evicted; + } + @VisibleForTesting public boolean reserveMemory(final long memoryToReserve, boolean waitForEviction, AtomicBoolean isStopped) { @@ -154,4 +163,9 @@ public long purge() { metrics.incrCacheCapacityUsed(-evicted); return evicted; } + + @VisibleForTesting + public long getCurrentUsedSize() { + return usedMemory.get(); + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java index 0d16703011..18da2c49ca 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java @@ -57,4 +57,13 @@ * allocate the space. */ void reserveMemory(long memoryToReserve, AtomicBoolean isStopped); + + /** + * Request the memory manager to evict more memory, this will be blocking and might return 0 if nothing was evicted. + * + * @param memoryToEvict amount of bytes to evict. + * @return actual amount of evicted bytes. + */ + long evictMemory(long memoryToEvict); + } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java index bdaa12a724..8b3ffddd34 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java @@ -62,6 +62,10 @@ public TestBuddyAllocator(boolean direct, boolean mmap) { public void reserveMemory(long memoryToReserve, AtomicBoolean isStopped) { } + @Override public long evictMemory(long memoryToEvict) { + return 0; + } + @Override public void releaseMemory(long memUsage) { } @@ -93,8 +97,7 @@ public void testSameSizes() throws Exception { } } - @Test - public void testMultipleArenas() throws Exception { + @Test public void testMultipleArenas() throws Exception { int max = 8, maxAlloc = 1 << max, allocLog2 = max - 1, arenaCount = 5; BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << 3, maxAlloc, maxAlloc, maxAlloc * arenaCount, 0, tmpDir, new DummyMemoryManager(), diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java new file mode 100644 index 0000000000..f4d9057cc2 --- /dev/null +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cache; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.io.Allocator; +import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.io.metadata.MetadataCache; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * Test the full cycle of allocation/accounting/eviction interactions. + */ +public class TestCacheAllocationsEvictionsCycles { + + private static final Logger LOG = LoggerFactory.getLogger(TestCacheAllocationsEvictionsCycles.class); + private static final LlapDaemonCacheMetrics CACHE_METRICS = LlapDaemonCacheMetrics.create("testCache", "testSession"); + + private final long maxSize = 1024; + private final LowLevelCache dataCache = Mockito.mock(LowLevelCache.class); + private final SerDeLowLevelCacheImpl serdCache = Mockito.mock(SerDeLowLevelCacheImpl.class); + private final MetadataCache metaDataCache = Mockito.mock(MetadataCache.class); + + private BuddyAllocator allocator; + private MemoryManager memoryManager; + private LowLevelCachePolicy cachePolicy; + private EvictionTracker evictionTracker; + + @Before public void setUp() throws Exception { + Configuration conf = new Configuration(); + // Set lambda to 1 so the heap size becomes 1 (LRU). + conf.setDouble(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f); + int minBufferSize = 1; + cachePolicy = new LowLevelLrfuCachePolicy(minBufferSize, maxSize, conf); + memoryManager = new LowLevelCacheMemoryManager(maxSize, cachePolicy, CACHE_METRICS); + int maxAllocationSize = 1024; + int minAllocationSize = 8; + allocator = + new BuddyAllocator(true, + false, + minAllocationSize, + maxAllocationSize, + 1, + maxSize, + 0, + null, + memoryManager, CACHE_METRICS, + "no-force-eviction", + true); + EvictionDispatcher evictionDispatcher = new EvictionDispatcher(dataCache, serdCache, metaDataCache, allocator); + evictionTracker = new EvictionTracker(evictionDispatcher); + cachePolicy.setEvictionListener(evictionTracker); + } + + @After public void tearDown() throws Exception { + LOG.info("Purge the cache on tear down"); + cachePolicy.purge(); + allocator = null; + memoryManager = null; + cachePolicy = null; + } + + /** + * Test case to ensure that deallocate it does merge small blocks into bigger ones. + */ + @Test(timeout = 6_000L) public void testMergeOfBlocksAfterDeallocate() { + // allocate blocks of cacheSize/16, Then deallocate then Allocate of size cacheSize/2 + MemoryBuffer[] dest = new MemoryBuffer[16]; + for (MemoryBuffer memoryBuffer : dest) { + Assert.assertNull(memoryBuffer); + } + allocator.allocateMultiple(dest, 64, null); + //Check that everything is allocated + Assert.assertEquals(maxSize, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + for (MemoryBuffer value : dest) { + cachePolicy.notifyUnlock((LlapCacheableBuffer) value); + } + for (MemoryBuffer value : dest) { + Assert.assertTrue(value instanceof LlapDataBuffer); + LlapDataBuffer buffer = (LlapDataBuffer) value; + Assert.assertEquals(buffer.getMemoryUsage(), cachePolicy.evictSomeBlocks(buffer.getMemoryUsage())); + memoryManager.releaseMemory(buffer.getMemoryUsage()); + } + + // All is deAllocated thus used has to be zero + Assert.assertEquals(0, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + MemoryBuffer[] dest2 = new MemoryBuffer[2]; + for (MemoryBuffer memoryBuffer : dest2) { + Assert.assertNull(memoryBuffer); + } + allocator.allocateMultiple(dest2, 512, null); + Assert.assertEquals(maxSize, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + for (int i = 0; i < dest2.length; i++) { + Assert.assertNotNull(dest[i]); + //we are not calling deallocate evict to avoid extra memory manager free calls + allocator.deallocate(dest2[i]); + } + Assert.assertEquals(0, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + } + + @Test(timeout = 6_000L) public void testSimpleAllocateThenEvictThenAllocate() { + // Allocate all the cache 16 * 64 + MemoryBuffer[] dest = new MemoryBuffer[16]; + for (MemoryBuffer memoryBuffer : dest) { + Assert.assertNull(memoryBuffer); + } + allocator.allocateMultiple(dest, 64, null); + Assert.assertEquals(maxSize, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + for (MemoryBuffer buffer : dest) { + cachePolicy.notifyUnlock((LlapCacheableBuffer) buffer); + } + // allocate bigger blocks + dest = new MemoryBuffer[8]; + allocator.allocateMultiple(dest, 128, null); + Assert.assertEquals(maxSize, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + for (MemoryBuffer memoryBuffer : dest) { + Assert.assertNotNull(memoryBuffer); + allocator.deallocate(memoryBuffer); + } + } + + @Test(timeout = 6_000L) public void testRandomFragmentation() { + + MemoryBuffer[] memBuffers8B = new MemoryBuffer[64]; + MemoryBuffer[] memBuffers16B = new MemoryBuffer[16]; + MemoryBuffer[] memBuffers32B = new MemoryBuffer[8]; + for (MemoryBuffer memoryBuffer : memBuffers8B) { + Assert.assertNull(memoryBuffer); + } + + allocator.allocateMultiple(memBuffers8B, 8, null); + allocator.allocateMultiple(memBuffers16B, 16, null); + allocator.allocateMultiple(memBuffers32B, 32, null); + //all the cache is allocated with 8 X 128 + Assert.assertEquals(maxSize, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + + for (int i = 0; i < memBuffers8B.length; i++) { + LlapDataBuffer buffer = (LlapDataBuffer) memBuffers8B[i]; + // this is needed to make sure that the policy adds the buffers to the linked list as buffers ready to be evicted + cachePolicy.notifyUnlock(buffer); + // lock some buffers + if (i % 2 == 0) { + // lock the even buffers + buffer.incRef(); + } + } + + for (int i = 0; i < memBuffers16B.length; i++) { + LlapDataBuffer buffer = (LlapDataBuffer) memBuffers16B[i]; + // this is needed to make sure that the policy adds the buffers to the linked list as buffers ready to be evicted + cachePolicy.notifyUnlock(buffer); + // lock some buffers + if (i % 2 == 0) { + // lock the even buffers + buffer.incRef(); + } + } + + for (int i = 0; i < memBuffers32B.length; i++) { + LlapDataBuffer buffer = (LlapDataBuffer) memBuffers32B[i]; + // this is needed to make sure that the policy adds the buffers to the linked list as buffers ready to be evicted + cachePolicy.notifyUnlock(buffer); + // lock some buffers + if (i % 2 == 0) { + // lock the even buffers + buffer.incRef(); + } + } + Assert.assertEquals(512, ((LowLevelCacheMemoryManager) memoryManager).purge()); + + for (MemoryBuffer memoryBuffer : memBuffers32B) { + LlapDataBuffer buffer = (LlapDataBuffer) memoryBuffer; + // this is needed to make sure that the policy adds the buffers to the linked list as buffers ready to be evicted + if (buffer.isLocked()) { + buffer.decRef(); + } + cachePolicy.notifyUnlock(buffer); + } + + for (MemoryBuffer memoryBuffer : memBuffers16B) { + LlapDataBuffer buffer = (LlapDataBuffer) memoryBuffer; + // this is needed to make sure that the policy adds the buffers to the linked list as buffers ready to be evicted + if (buffer.isLocked()) { + buffer.decRef(); + } + cachePolicy.notifyUnlock(buffer); + } + + for (MemoryBuffer memoryBuffer : memBuffers8B) { + LlapDataBuffer buffer = (LlapDataBuffer) memoryBuffer; + // this is needed to make sure that the policy adds the buffers to the linked list as buffers ready to be evicted + if (buffer.isLocked()) { + buffer.decRef(); + } + cachePolicy.notifyUnlock(buffer); + } + Assert.assertEquals(maxSize / 2, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + + memBuffers8B = new MemoryBuffer[64]; + memBuffers16B = new MemoryBuffer[16]; + memBuffers32B = new MemoryBuffer[8]; + evictionTracker.getEvicted().clear(); + allocator.allocateMultiple(memBuffers16B, 16, null); + allocator.allocateMultiple(memBuffers8B, 8, null); + allocator.allocateMultiple(memBuffers32B, 32, null); + Assert.assertEquals(maxSize, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + Assert.assertEquals(memBuffers32B.length / 2 + memBuffers16B.length / 2 + memBuffers8B.length / 2, evictionTracker.getEvicted().size()); + for (MemoryBuffer memoryBuffer : memBuffers8B) { + LlapDataBuffer buffer = (LlapDataBuffer) memoryBuffer; + allocator.deallocate(buffer); + } + for (MemoryBuffer memoryBuffer : memBuffers16B) { + LlapDataBuffer buffer = (LlapDataBuffer) memoryBuffer; + allocator.deallocate(buffer); + } + + for (MemoryBuffer memoryBuffer : memBuffers32B) { + LlapDataBuffer buffer = (LlapDataBuffer) memoryBuffer; + allocator.deallocate(buffer); + } + } + + @Test(timeout = 6_000L) public void testFragmentation() { + MemoryBuffer[] dest = new MemoryBuffer[128]; + for (MemoryBuffer memoryBuffer : dest) { + Assert.assertNull(memoryBuffer); + } + allocator.allocateMultiple(dest, 8, null); + //all the cache is allocated with 8 X 128 + Assert.assertEquals(maxSize, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + + for (int i = 0; i < dest.length; i++) { + LlapDataBuffer buffer = (LlapDataBuffer) dest[i]; + // this is needed to make sure that the policy adds the buffers to the linked list as buffers ready to be evicted + cachePolicy.notifyUnlock(buffer); + // lock some buffers + if (i % 2 == 0) { + // lock the even buffers + buffer.incRef(); + } + } + + // purge the cache should lead to only evicting the unlocked buffers + Assert.assertEquals(512, ((LowLevelCacheMemoryManager) memoryManager).purge()); + // After purge the used memory should be aligned to the amount of evicted items + Assert.assertEquals(512, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + + MemoryBuffer[] dest2 = new MemoryBuffer[1]; + Exception exception = null; + try { + allocator.allocateMultiple(dest2, 16, null); + } catch (Allocator.AllocatorOutOfMemoryException e) { + // we should fail since we have the extreme case where half of the leaf nodes are locked thus + // maximum fragmentation case + exception = e; + } + Assert.assertNotNull(exception); + //We need to make sure that the failed allocation attempt undo the reserved memory. + //https://issues.apache.org/jira/browse/HIVE-21689 + Assert.assertEquals(512, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + // unlock one buffer + Assert.assertTrue(((LlapDataBuffer) dest[0]).isLocked()); + ((LlapDataBuffer) dest[0]).decRef(); + evictionTracker.clear(); + // this is needed since purge has removed the locked ones form the list, the assumption is that when we unlock + // a buffer we notify the cache policy. + cachePolicy.notifyUnlock((LlapDataBuffer) dest[0]); + // we should be able to allocate after some extra eviction + allocator.allocateMultiple(dest2, 16, null); + //we have to see that we have force evicted something to make room for the new allocation + Assert.assertTrue(evictionTracker.getEvicted().size() >= 1); + } + + private final class EvictionTracker implements EvictionListener { + private final EvictionListener evictionListener; + + private List evicted = new ArrayList<>(); + + private EvictionTracker(EvictionListener evictionListener) { + this.evictionListener = evictionListener; + } + + @Override public void notifyEvicted(LlapCacheableBuffer buffer) { + evicted.add(buffer); + evictionListener.notifyEvicted(buffer); + } + + public List getEvicted() { + return evicted; + } + + public void clear() { + evicted.clear(); + } + } +} diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java index ad6b90e358..e5953e2b95 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java @@ -27,8 +27,6 @@ import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.hadoop.hive.common.io.DiskRangeList; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.io.metadata.MetadataCache; import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapBufferOrBuffers; @@ -86,6 +84,10 @@ public void reserveMemory(long memoryToReserve, AtomicBoolean isStopped) { ++allocs; } + @Override public long evictMemory(long memoryToEvict) { + return 0; + } + @Override public void releaseMemory(long memUsage) { }