diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 049e83713e..089b88cdb0 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4053,6 +4053,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal LLAP_ALLOCATOR_DEFRAG_HEADROOM("hive.llap.io.allocator.defrag.headroom", "1Mb", "How much of a headroom to leave to allow allocator more flexibility to defragment.\n" + "The allocator would further cap it to a fraction of total memory."), + LLAP_ALLOCATOR_MAX_FORCE_EVICTED("hive.llap.io.allocator.max.force.eviction", "16Mb", + "Fragmentation can lead to some cases where more eviction has to happen to accommodate allocations\n" + + " This configuration puts a limit on how many bytes to force evict before using Allocator Discard method." + + " Higher values will allow allocator more flexibility and will lead to better caching."), LLAP_TRACK_CACHE_USAGE("hive.llap.io.track.cache.usage", true, "Whether to tag LLAP cache contents, mapping them to Hive entities (paths for\n" + "partitions and tables) for reporting."), diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java index 60d6edfdcb..f23188c0b2 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java @@ -48,6 +48,24 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator; +/** + * + * High level description, functionality and the memory layout. + * Allocation can be of general size but it will be rounded up to the next power of 2. + * Allocation smaller than size {@link ConfVars#LLAP_ALLOCATOR_MIN_ALLOC} will be rounded up to min allocation size. + * Allocation bigger than size {@link ConfVars#LLAP_ALLOCATOR_MAX_ALLOC} will throw exception. + * Allocator slices memory slab called {@code Arena} to carve out byte buffers using slice and position. + * + * Each {@code Arena} has array of {@code freeList} used for concurrency management and to index free buffers by size. + * | + * Each {@code Arena} has a max Size {@link BuddyAllocator#MAX_ARENA_SIZE} 1GB. + * \_ + * Each arena is divided into chunks of max allocation size {@link ConfVars#LLAP_ALLOCATOR_MAX_ALLOC} default 16MB + * \_ + * Each chunk of {@code maxAllocationSize} is sliced using the classical Buddy Allocator algorithm. + * \_ + * Each Buddy Allocator tree can be split up to chunks of size {@link ConfVars#LLAP_ALLOCATOR_MIN_ALLOC} 4KB + */ public final class BuddyAllocator implements EvictionAwareAllocator, StoppableAllocator, BuddyAllocatorMXBean, LlapIoDebugDump { private final Arena[] arenas; @@ -65,6 +83,7 @@ private final long maxSize; private final boolean isDirect; private final boolean isMapped; + private final int maxForcedEvictionSize; private final Path cacheDir; // These are only used for tests. @@ -72,12 +91,19 @@ // We don't know the acceptable size for Java array, so we'll use 1Gb boundary. // That is guaranteed to fit any maximum allocation. - private static final int MAX_ARENA_SIZE = 1024*1024*1024; + private static final int MAX_ARENA_SIZE = 1024 * 1024 * 1024; // Don't try to operate with less than MIN_SIZE allocator space, it will just give you grief. - private static final int MIN_TOTAL_MEMORY_SIZE = 64*1024*1024; + private static final int MIN_TOTAL_MEMORY_SIZE = 64 * 1024 * 1024; // Maximum reasonable defragmentation headroom. Mostly kicks in on very small caches. private static final float MAX_DEFRAG_HEADROOM_FRACTION = 0.01f; - + /** + * Max number of attempts to try allocations before giving up and move to forced evictions or/and freelist/brute + * force discards, + * Note to reader: why 5, i don't know, i run with 2 and 5 seems providing same results, thus the 5, better try more + * than start evictions, Also I am sure any value less than one will hurt. + */ + private static final int MAX_FAST_ATTEMPT = 5; + private static final int MAX_FORCED_EVICTION_SIZE = 1024 * 1024 * 16 ;//16MB private static final FileAttribute> RWX = PosixFilePermissions .asFileAttribute(PosixFilePermissions.fromString("rwx------")); private final AtomicLong[] defragCounters; @@ -86,24 +112,22 @@ // Discard context that is cached for reuse per thread to avoid allocating lots of arrays, // and then resizing them down the line if we need a bigger size. // Only the IO threads need this, so there'd be at most few dozen objects. - private final static ThreadLocal threadCtx = new ThreadLocal() { - protected DiscardContext initialValue() { - return new DiscardContext(); - }; - }; + private final static ThreadLocal threadCtx = ThreadLocal.withInitial(DiscardContext::new); public BuddyAllocator(Configuration conf, MemoryManager mm, LlapDaemonCacheMetrics metrics) { this(HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_DIRECT), HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_MAPPED), - (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC), - (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MAX_ALLOC), + (int) HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC), + (int) HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MAX_ALLOC), HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_ARENA_COUNT), getMaxTotalMemorySize(conf), HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_DEFRAG_HEADROOM), HiveConf.getVar(conf, ConfVars.LLAP_ALLOCATOR_MAPPED_PATH), - mm, metrics, HiveConf.getVar(conf, ConfVars.LLAP_ALLOCATOR_DISCARD_METHOD), - HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_PREALLOCATE) - ); + mm, + metrics, + HiveConf.getVar(conf, ConfVars.LLAP_ALLOCATOR_DISCARD_METHOD), + HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_PREALLOCATE), + (int) HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MAX_FORCE_EVICTED)); } private static boolean areAssertsEnabled() { @@ -123,11 +147,37 @@ private static long getMaxTotalMemorySize(Configuration conf) { + "disable LLAP IO entirely via " + ConfVars.LLAP_IO_ENABLED.varname); } + @VisibleForTesting public BuddyAllocator(boolean isDirectVal, + boolean isMappedVal, + int minAllocVal, + int maxAllocVal, + int arenaCount, + long maxSizeVal, + long defragHeadroom, + String mapPath, + MemoryManager memoryManager, + LlapDaemonCacheMetrics metrics, + String discardMethod, + boolean doPreallocate) { + this(isDirectVal, + isMappedVal, + minAllocVal, + maxAllocVal, + arenaCount, + maxSizeVal, + defragHeadroom, + mapPath, + memoryManager, + metrics, + discardMethod, + doPreallocate, + MAX_FORCED_EVICTION_SIZE); + } @VisibleForTesting public BuddyAllocator(boolean isDirectVal, boolean isMappedVal, int minAllocVal, int maxAllocVal, int arenaCount, long maxSizeVal, long defragHeadroom, String mapPath, MemoryManager memoryManager, LlapDaemonCacheMetrics metrics, String discardMethod, - boolean doPreallocate) { + boolean doPreallocate, int maxForcedEvictionSize) { isDirect = isDirectVal; isMapped = isMappedVal; minAllocation = minAllocVal; @@ -174,6 +224,7 @@ public BuddyAllocator(boolean isDirectVal, boolean isMappedVal, int minAllocVal, boolean isBoth = null == discardMethod || "both".equalsIgnoreCase(discardMethod); doUseFreeListDiscard = isBoth || "freelist".equalsIgnoreCase(discardMethod); doUseBruteDiscard = isBoth || "brute".equalsIgnoreCase(discardMethod); + this.maxForcedEvictionSize = maxForcedEvictionSize; } public long determineMaxMmSize(long defragHeadroom, long maxMmSize) { @@ -277,7 +328,74 @@ public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory long threadId = arenaCount > 1 ? Thread.currentThread().getId() : 0; int destAllocIx = allocateFast(dest, null, 0, dest.length, freeListIx, allocationSize, (int)(threadId % arenaCount), arenaCount); - if (destAllocIx == dest.length) return; + if (destAllocIx == dest.length) { + return; + } + // Try to allocate memory if we haven't allocated all the way to maxSize yet; very rare. + destAllocIx = allocateWithExpand(dest, destAllocIx, freeListIx, allocationSize, arenaCount); + if (destAllocIx == dest.length) { + return; + } + + int allocationAttempt = 0; + long totalForceEvictedBytes = 0; + long currentEvicted; + int startArenaIx; + int emptyAttempt = 0; + + // We called reserveMemory so we know that there's memory waiting for us somewhere. + // But that can mean that the reserved memory is fragmented thus unusable + // or we have a common class of rare race conditions related to the order of locking/checking of + // different allocation areas this is due to the fact that Allocations arrive in a burst fashion. + // To mitigate that: + // - First we try again (fast path): + // - Second try to push on the memoryManger/evictor to evict more. + // - Third bypass the memory manager and use discards method to brute force evict if needed. + while (totalForceEvictedBytes < maxForcedEvictionSize && emptyAttempt < MAX_DISCARD_ATTEMPTS) { + // make sure we hit a different start on each new attempt + startArenaIx = (int) ((threadId + allocationAttempt) % arenaCount); + if (allocationAttempt >= MAX_FAST_ATTEMPT) { + // Try to evict more starting from how much memory we are missing dest.length - destAllocIx << allocLog2 + // Exponentially increase the eviction request based on forceEvictAttempt counter. + // Why Exponentially increase, usually query fragments triggers a burst of allocation at the same time + // Thus IMO to avoid lock contentions it is worth to over evict thus next allocation will get free lunch. + currentEvicted = + memoryManager.evictMemory(dest.length - destAllocIx + allocationAttempt - MAX_FAST_ATTEMPT << allocLog2); + // Note that memoryManager.evictMemory is blocking but eventually will return 0 if all the memory is locked + // thus mark empty attempts and bail out after MAX_DISCARD_ATTEMPTS + totalForceEvictedBytes += currentEvicted; + emptyAttempt += currentEvicted == 0 ? 1 : 0; + } + destAllocIx = + allocateFast(dest, null, destAllocIx, dest.length, freeListIx, allocationSize, startArenaIx, arenaCount); + if (destAllocIx == dest.length) { + return; + } + destAllocIx = + allocateWithSplit(dest, + null, + destAllocIx, + dest.length, + freeListIx, + allocationSize, + startArenaIx, + arenaCount, + -1); + if (destAllocIx == dest.length) { + return; + } + + allocationAttempt++; + } + + LlapIoImpl.LOG.warn( + "Failed to allocate [{}]X[{}] bytes after [{}] attempt, evicted [{}] bytes and partially allocated [{}] bytes", + dest.length, + allocationSize, + allocationAttempt, + totalForceEvictedBytes, + destAllocIx << allocLog2); + // We called reserveMemory so we know that there's memory waiting for us somewhere. // However, we have a class of rare race conditions related to the order of locking/checking of @@ -294,25 +412,18 @@ public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory // allocator thread (or threads per arena). // The 2nd one is probably much simpler and will allow us to get rid of a lot of sync code. // But for now we will just retry. We will evict more each time. - int attempt = 0; + int discardsAttempt = 0; boolean isFailed = false; int memoryForceReleased = 0; try { int discardFailed = 0; while (true) { // Try to split bigger blocks. - int startArenaIx = (int)((threadId + attempt) % arenaCount); + startArenaIx = (int)((threadId + discardsAttempt + allocationAttempt) % arenaCount); destAllocIx = allocateWithSplit(dest, null, destAllocIx, dest.length, freeListIx, allocationSize, startArenaIx, arenaCount, -1); if (destAllocIx == dest.length) return; - if (attempt == 0) { - // Try to allocate memory if we haven't allocated all the way to maxSize yet; very rare. - destAllocIx = allocateWithExpand( - dest, destAllocIx, freeListIx, allocationSize, arenaCount); - if (destAllocIx == dest.length) return; - } - // Try to force-evict the fragments of the requisite size. boolean hasDiscardedAny = false; DiscardContext ctx = threadCtx.get(); @@ -327,7 +438,9 @@ public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory hasDiscardedAny = ctx.resultCount > 0; destAllocIx = allocateFromDiscardResult( dest, destAllocIx, freeListIx, allocationSize, ctx); - if (destAllocIx == dest.length) return; + if (destAllocIx == dest.length) { + return; + } } // Then, try the brute force search for something to throw away. if (doUseBruteDiscard) { @@ -337,7 +450,9 @@ public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory hasDiscardedAny = hasDiscardedAny || (ctx.resultCount > 0); destAllocIx = allocateFromDiscardResult( dest, destAllocIx, freeListIx, allocationSize, ctx); - if (destAllocIx == dest.length) return; + if (destAllocIx == dest.length) { + return; + } } if (hasDiscardedAny) { @@ -352,18 +467,20 @@ public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory LlapIoImpl.LOG.info("Failed to deallocate after a partially successful allocate: " + dest[i]); } } + //Need to release the un-allocated but reserved memory. + memoryManager.releaseMemory(dest.length - destAllocIx << allocLog2); String msg = "Failed to allocate " + size + "; at " + destAllocIx + " out of " + dest.length + " (entire cache is fragmented and locked, or an internal issue)"; logOomErrorMessage(msg); throw new AllocatorOutOfMemoryException(msg); } - ++attempt; + ++discardsAttempt; } } finally { memoryManager.releaseMemory(memoryForceReleased); - if (!isFailed && attempt >= LOG_DISCARD_ATTEMPTS) { + if (!isFailed && discardsAttempt >= LOG_DISCARD_ATTEMPTS) { LlapIoImpl.LOG.info("Allocation of " + dest.length + " buffers of size " + size + " took " - + attempt + " attempts to free enough memory; force-released " + memoryForceReleased); + + discardsAttempt + " attempts to free enough memory; force-released " + memoryForceReleased); } } } @@ -738,8 +855,7 @@ private ByteBuffer preallocateArenaBuffer(int arenaSize) { rwf.setLength(arenaSize); // truncate (TODO: posix_fallocate?) // Use RW, not PRIVATE because the copy-on-write is irrelevant for a deleted file // see discussion in YARN-5551 for the memory accounting discussion - ByteBuffer rwbuf = rwf.getChannel().map(MapMode.READ_WRITE, 0, arenaSize); - return rwbuf; + return rwf.getChannel().map(MapMode.READ_WRITE, 0, arenaSize); } catch (IOException ioe) { LlapIoImpl.LOG.warn("Failed trying to allocate memory mapped arena", ioe); // fail similarly when memory allocations fail @@ -1278,7 +1394,6 @@ private int allocateWithSplit(int freeListIx, MemoryBuffer[] dest, } lastSplitBlocksRemaining >>>= 1; ++newListIndex; - continue; } } ++splitListIx; @@ -1774,10 +1889,10 @@ public synchronized void dumpLog(boolean doSleep) { if (doSleep) { try { Thread.sleep(100); - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { } } - int logSize = (int)offset.get(); + int logSize = offset.get(); int ix = 0; while (ix < logSize) { ix = dumpOneLine(ix); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocatorMXBean.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocatorMXBean.java index 5e3cc50c1b..28a954a3c0 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocatorMXBean.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocatorMXBean.java @@ -22,41 +22,41 @@ /** * MXbean to expose cache allocator related information through JMX. */ -@MXBean -public interface BuddyAllocatorMXBean { +@MXBean public interface BuddyAllocatorMXBean { /** * Gets if bytebuffers are allocated directly offheap. * * @return gets if direct bytebuffer allocation */ - public boolean getIsDirect(); + boolean getIsDirect(); /** * Gets minimum allocation size of allocator. * * @return minimum allocation size */ - public int getMinAllocation(); + int getMinAllocation(); /** * Gets maximum allocation size of allocator. * * @return maximum allocation size */ - public int getMaxAllocation(); + int getMaxAllocation(); /** * Gets the arena size. * * @return arena size */ - public int getArenaSize(); + int getArenaSize(); /** * Gets the maximum cache size. * * @return max cache size */ - public long getMaxCacheSize(); -} \ No newline at end of file + long getMaxCacheSize(); + +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java index c5b5bf2977..c20831f351 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java @@ -66,6 +66,21 @@ public void reserveMemory(final long memoryToReserve, AtomicBoolean isStopped) { throw new ReserveFailedException(isStopped); } + /** + * Ask the memory manager to evict more memory + * + * @param memoryToEvict amount of bytes to evict + * @return actual amount of evicted bytes. + */ + @Override public long evictMemory(long memoryToEvict) { + if (evictor == null) { + return 0; + } + long evicted = evictor.evictSomeBlocks(memoryToEvict); + releaseMemory(evicted); + return evicted; + } + @VisibleForTesting public boolean reserveMemory(final long memoryToReserve, boolean waitForEviction, AtomicBoolean isStopped) { @@ -154,4 +169,9 @@ public long purge() { metrics.incrCacheCapacityUsed(-evicted); return evicted; } + + @VisibleForTesting + public long getCurrentUsedSize() { + return usedMemory.get(); + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java index 0d16703011..b2a42ad6ae 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java @@ -57,4 +57,13 @@ * allocate the space. */ void reserveMemory(long memoryToReserve, AtomicBoolean isStopped); + + /** + * Ask the memory manager to evict more memory + * + * @param memoryToReserve amount of bytes to evict + * @return actual amount of evicted bytes. + */ + long evictMemory(long memoryToReserve); + } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java index bdaa12a724..ef3f33c3f0 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java @@ -62,6 +62,10 @@ public TestBuddyAllocator(boolean direct, boolean mmap) { public void reserveMemory(long memoryToReserve, AtomicBoolean isStopped) { } + @Override public long evictMemory(long memoryToReserve) { + return 0; + } + @Override public void releaseMemory(long memUsage) { } @@ -93,8 +97,7 @@ public void testSameSizes() throws Exception { } } - @Test - public void testMultipleArenas() throws Exception { + @Test public void testMultipleArenas() throws Exception { int max = 8, maxAlloc = 1 << max, allocLog2 = max - 1, arenaCount = 5; BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << 3, maxAlloc, maxAlloc, maxAlloc * arenaCount, 0, tmpDir, new DummyMemoryManager(), diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java new file mode 100644 index 0000000000..f3c535dd81 --- /dev/null +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cache; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.io.Allocator; +import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.io.metadata.MetadataCache; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * Test the full cycle of allocation/accounting/eviction interactions. + */ +public class TestCacheAllocationsEvictionsCycles { + + private static final Logger LOG = LoggerFactory.getLogger(TestCacheAllocationsEvictionsCycles.class); + private static final LlapDaemonCacheMetrics cacheMetrics = LlapDaemonCacheMetrics.create("testCache", "testSession"); + + private final long maxSize = 1024; + private final LowLevelCache dataCache = Mockito.mock(LowLevelCache.class); + private final SerDeLowLevelCacheImpl serdCache = Mockito.mock(SerDeLowLevelCacheImpl.class); + private final MetadataCache metaDataCache = Mockito.mock(MetadataCache.class); + + private BuddyAllocator allocator; + private MemoryManager memoryManager; + private LowLevelCachePolicy cachePolicy; + private EvictionTracker evictionTracker; + + @Before public void setUp() throws Exception { + Configuration conf = new Configuration(); + // Set lambda to 1 so the heap size becomes 1 (LRU). + conf.setDouble(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f); + int minBufferSize = 1; + cachePolicy = new LowLevelLrfuCachePolicy(minBufferSize, maxSize, conf); + memoryManager = new LowLevelCacheMemoryManager(maxSize, cachePolicy, cacheMetrics); + int maxAllocationSize = 1024; + int minAllocationSize = 8; + allocator = + new BuddyAllocator(true, + false, + minAllocationSize, + maxAllocationSize, + 1, + maxSize, + 0, + null, + memoryManager, + cacheMetrics, + "no-force-eviction", + true); + EvictionDispatcher evictionDispatcher = new EvictionDispatcher(dataCache, serdCache, metaDataCache, allocator); + evictionTracker = new EvictionTracker(evictionDispatcher); + cachePolicy.setEvictionListener(evictionTracker); + } + + @After public void tearDown() throws Exception { + LOG.info("Purge the cache on tear down"); + cachePolicy.purge(); + allocator = null; + memoryManager = null; + cachePolicy = null; + } + + /** + * Test case to ensure that deallocate it does merge small blocks into bigger ones. + */ + @Test(timeout = 6_000L) public void testMergeOfBlocksAfterDeallocate() { + // allocate blocks of cacheSize/16, Then deallocate then Allocate of size cacheSize/2 + MemoryBuffer[] dest = new MemoryBuffer[16]; + for (MemoryBuffer memoryBuffer : dest) { + Assert.assertNull(memoryBuffer); + } + allocator.allocateMultiple(dest, 64, null); + //Check that everything is allocated + Assert.assertEquals(maxSize, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + for (MemoryBuffer value : dest) { + cachePolicy.notifyUnlock((LlapCacheableBuffer) value); + } + for (MemoryBuffer value : dest) { + Assert.assertTrue(value instanceof LlapDataBuffer); + LlapDataBuffer buffer = (LlapDataBuffer) value; + Assert.assertEquals(buffer.getMemoryUsage(), cachePolicy.evictSomeBlocks(buffer.getMemoryUsage())); + memoryManager.releaseMemory(buffer.getMemoryUsage()); + } + + // All is deAllocated thus used has to be zero + Assert.assertEquals(0, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + MemoryBuffer[] dest2 = new MemoryBuffer[2]; + for (MemoryBuffer memoryBuffer : dest2) { + Assert.assertNull(memoryBuffer); + } + allocator.allocateMultiple(dest2, 512, null); + Assert.assertEquals(maxSize, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + for (int i = 0; i < dest2.length; i++) { + Assert.assertNotNull(dest[i]); + //we are not calling deallocate evict to avoid extra memory manager free calls + allocator.deallocate(dest2[i]); + } + Assert.assertEquals(0, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + } + + @Test(timeout = 6_000L) public void testSimpleAllocateThenEvictThenAllocate() { + // Allocate all the cache 16 * 64 + MemoryBuffer[] dest = new MemoryBuffer[16]; + for (MemoryBuffer memoryBuffer : dest) { + Assert.assertNull(memoryBuffer); + } + allocator.allocateMultiple(dest, 64, null); + Assert.assertEquals(maxSize, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + for (MemoryBuffer buffer : dest) { + cachePolicy.notifyUnlock((LlapCacheableBuffer) buffer); + } + // allocate bigger blocks + dest = new MemoryBuffer[8]; + allocator.allocateMultiple(dest, 128, null); + Assert.assertEquals(maxSize, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + for (MemoryBuffer memoryBuffer : dest) { + Assert.assertNotNull(memoryBuffer); + allocator.deallocate(memoryBuffer); + } + } + + @Test(timeout = 6_000L) public void testRandomFragmentation() { + + MemoryBuffer[] dest = new MemoryBuffer[64]; + MemoryBuffer[] dest_2 = new MemoryBuffer[16]; + MemoryBuffer[] dest_3 = new MemoryBuffer[8]; + for (MemoryBuffer memoryBuffer : dest) { + Assert.assertNull(memoryBuffer); + } + + allocator.allocateMultiple(dest, 8, null); + allocator.allocateMultiple(dest_2, 16, null); + allocator.allocateMultiple(dest_3, 32, null); + //all the cache is allocated with 8 X 128 + Assert.assertEquals(maxSize, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + + for (int i = 0; i < dest.length; i++) { + LlapDataBuffer buffer = (LlapDataBuffer) dest[i]; + // this is needed to make sure that the policy adds the buffers to the linked list as buffers ready to be evicted + cachePolicy.notifyUnlock(buffer); + // lock some buffers + if (i % 2 == 0) { + // lock the even buffers + buffer.incRef(); + } + } + + for (int i = 0; i < dest_2.length; i++) { + LlapDataBuffer buffer = (LlapDataBuffer) dest_2[i]; + // this is needed to make sure that the policy adds the buffers to the linked list as buffers ready to be evicted + cachePolicy.notifyUnlock(buffer); + // lock some buffers + if (i % 2 == 0) { + // lock the even buffers + buffer.incRef(); + } + } + + for (int i = 0; i < dest_3.length; i++) { + LlapDataBuffer buffer = (LlapDataBuffer) dest_3[i]; + // this is needed to make sure that the policy adds the buffers to the linked list as buffers ready to be evicted + cachePolicy.notifyUnlock(buffer); + // lock some buffers + if (i % 2 == 0) { + // lock the even buffers + buffer.incRef(); + } + } + Assert.assertEquals(512, ((LowLevelCacheMemoryManager) memoryManager).purge()); + + for (MemoryBuffer memoryBuffer : dest_3) { + LlapDataBuffer buffer = (LlapDataBuffer) memoryBuffer; + // this is needed to make sure that the policy adds the buffers to the linked list as buffers ready to be evicted + if (buffer.isLocked()) { + buffer.decRef(); + } + cachePolicy.notifyUnlock(buffer); + } + + for (MemoryBuffer memoryBuffer : dest_2) { + LlapDataBuffer buffer = (LlapDataBuffer) memoryBuffer; + // this is needed to make sure that the policy adds the buffers to the linked list as buffers ready to be evicted + if (buffer.isLocked()) { + buffer.decRef(); + } + cachePolicy.notifyUnlock(buffer); + } + + for (MemoryBuffer memoryBuffer : dest) { + LlapDataBuffer buffer = (LlapDataBuffer) memoryBuffer; + // this is needed to make sure that the policy adds the buffers to the linked list as buffers ready to be evicted + if (buffer.isLocked()) { + buffer.decRef(); + } + cachePolicy.notifyUnlock(buffer); + } + Assert.assertEquals(maxSize / 2, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + + dest = new MemoryBuffer[64]; + dest_2 = new MemoryBuffer[16]; + dest_3 = new MemoryBuffer[8]; + evictionTracker.getEvicted().clear(); + allocator.allocateMultiple(dest_2, 16, null); + allocator.allocateMultiple(dest, 8, null); + allocator.allocateMultiple(dest_3, 32, null); + Assert.assertEquals(maxSize, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + Assert.assertEquals(dest_3.length / 2 + dest_2.length / 2 + dest.length / 2, evictionTracker.getEvicted().size()); + for (MemoryBuffer memoryBuffer : dest) { + LlapDataBuffer buffer = (LlapDataBuffer) memoryBuffer; + allocator.deallocate(buffer); + } + for (MemoryBuffer memoryBuffer : dest_2) { + LlapDataBuffer buffer = (LlapDataBuffer) memoryBuffer; + allocator.deallocate(buffer); + } + + for (MemoryBuffer memoryBuffer : dest_3) { + LlapDataBuffer buffer = (LlapDataBuffer) memoryBuffer; + allocator.deallocate(buffer); + } + } + + @Test(timeout = 6_000L) public void testFragmentation() { + MemoryBuffer[] dest = new MemoryBuffer[128]; + for (MemoryBuffer memoryBuffer : dest) { + Assert.assertNull(memoryBuffer); + } + allocator.allocateMultiple(dest, 8, null); + //all the cache is allocated with 8 X 128 + Assert.assertEquals(maxSize, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + + for (int i = 0; i < dest.length; i++) { + LlapDataBuffer buffer = (LlapDataBuffer) dest[i]; + // this is needed to make sure that the policy adds the buffers to the linked list as buffers ready to be evicted + cachePolicy.notifyUnlock(buffer); + // lock some buffers + if (i % 2 == 0) { + // lock the even buffers + buffer.incRef(); + } + } + + // purge the cache should lead to only evicting the unlocked buffers + Assert.assertEquals(512, ((LowLevelCacheMemoryManager) memoryManager).purge()); + // After purge the used memory should be aligned to the amount of evicted items + Assert.assertEquals(512, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + + MemoryBuffer[] dest2 = new MemoryBuffer[1]; + Exception exception = null; + try { + allocator.allocateMultiple(dest2, 16, null); + } catch (Allocator.AllocatorOutOfMemoryException e) { + // we should fail since we have the extreme case where half of the leaf nodes are locked thus + // maximum fragmentation case + exception = e; + } + Assert.assertNotNull(exception); + //We need to make sure that the failed allocation attempt undo the reserved memory. + //https://issues.apache.org/jira/browse/HIVE-21689 + Assert.assertEquals(512, ((LowLevelCacheMemoryManager) memoryManager).getCurrentUsedSize()); + // unlock one buffer + Assert.assertTrue(((LlapDataBuffer) dest[0]).isLocked()); + ((LlapDataBuffer) dest[0]).decRef(); + evictionTracker.clear(); + // this is needed since purge has removed the locked ones form the list, the assumption is that when we unlock + // a buffer we notify the cache policy. + cachePolicy.notifyUnlock((LlapDataBuffer) dest[0]); + // we should be able to allocate after some extra eviction + allocator.allocateMultiple(dest2, 16, null); + //we have to see that we have force evicted something to make room for the new allocation + Assert.assertTrue(evictionTracker.getEvicted().size() >= 1); + } + + private class EvictionTracker implements EvictionListener { + private final EvictionListener evictionListener; + + private List evicted = new ArrayList<>(); + + private EvictionTracker(EvictionListener evictionListener) { + this.evictionListener = evictionListener; + } + + @Override public void notifyEvicted(LlapCacheableBuffer buffer) { + evicted.add(buffer); + evictionListener.notifyEvicted(buffer); + } + + public List getEvicted() { + return evicted; + } + + public void clear() { + evicted.clear(); + } + } +} diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java index ad6b90e358..fa8a698c49 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java @@ -27,8 +27,6 @@ import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.hadoop.hive.common.io.DiskRangeList; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.io.metadata.MetadataCache; import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapBufferOrBuffers; @@ -86,6 +84,10 @@ public void reserveMemory(long memoryToReserve, AtomicBoolean isStopped) { ++allocs; } + @Override public long evictMemory(long memoryToReserve) { + return 0; + } + @Override public void releaseMemory(long memUsage) { }