diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java index 60d56d6511..4297cfc61d 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java @@ -133,4 +133,16 @@ public void releaseMemory(final long memoryToRelease) { public void updateMaxSize(long maxSize) { this.maxSize = maxSize; } + + public long purge() { + if (evictor == null) return 0; + long evicted = evictor.purge(); + if (evicted == 0) return 0; + long usedMem = -1; + do { + usedMem = usedMemory.get(); + } while (!usedMemory.compareAndSet(usedMem, usedMem - evicted)); + metrics.incrCacheCapacityUsed(-evicted); + return evicted; + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java index 82e19346bd..7c8eb6e410 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java @@ -57,7 +57,8 @@ private final double expirePriority(long time, long lastAccess, double previous) * Perhaps we should use ConcurrentDoubleLinkedList (in public domain). * ONLY LIST REMOVAL is allowed under list lock. */ - private final LlapCacheableBuffer[] heap; + private LlapCacheableBuffer[] heap; + private final Object heapLock = new Object(); private final ReentrantLock listLock = new ReentrantLock(); private LlapCacheableBuffer listHead, listTail; /** Number of elements. */ @@ -120,7 +121,7 @@ public void notifyUnlock(LlapCacheableBuffer buffer) { if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) { LlapIoImpl.CACHE_LOGGER.trace("Touching {} at {}", buffer, time); } - synchronized (heap) { + synchronized (heapLock) { // First, update buffer priority - we have just been using it. buffer.priority = (buffer.lastUpdate == -1) ? F0 : touchPriority(time, buffer.lastUpdate, buffer.priority); @@ -174,11 +175,75 @@ public void setEvictionListener(EvictionListener listener) { @Override public long purge() { - long evicted = evictSomeBlocks(Long.MAX_VALUE); - LlapIoImpl.LOG.info("PURGE: evicted {} from LRFU policy", LlapUtil.humanReadableByteCount(evicted)); + long evicted = 0; + LlapCacheableBuffer oldTail = null; + listLock.lock(); + try { + LlapCacheableBuffer current = oldTail = listTail; + while (current != null) { + boolean canEvict = LlapCacheableBuffer.INVALIDATE_OK != current.invalidate(); + current.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE; + if (canEvict) { + current = current.prev; + } else { + // Remove from the list. + LlapCacheableBuffer newCurrent = current.prev; + oldTail = removeFromLocalList(oldTail, current); + current = newCurrent; + } + } + listHead = listTail = null; + } finally { + listLock.unlock(); + } + + LlapCacheableBuffer[] oldHeap = null; + int oldHeapSize = -1; + synchronized (heapLock) { + oldHeap = heap; + oldHeapSize = heapSize; + heap = new LlapCacheableBuffer[heap.length]; + heapSize = 0; + for (int i = 0; i < oldHeapSize; ++i) { + LlapCacheableBuffer result = oldHeap[i]; + result.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE; + int invalidateResult = result.invalidate(); + if (invalidateResult != LlapCacheableBuffer.INVALIDATE_OK) { + oldHeap[i] = null; // Removed from heap without evicting. + } + } + } + LlapCacheableBuffer current = oldTail; + while (current != null) { + evicted += current.getMemoryUsage(); + evictionListener.notifyEvicted(current); + current = current.prev; + } + for (int i = 0; i < oldHeapSize; ++i) { + current = oldHeap[i]; + if (current == null) continue; + evicted += current.getMemoryUsage(); + evictionListener.notifyEvicted(current); + } + LlapIoImpl.LOG.info("PURGE: evicted {} from LRFU policy", + LlapUtil.humanReadableByteCount(evicted)); return evicted; } + private static LlapCacheableBuffer removeFromLocalList( + LlapCacheableBuffer tail, LlapCacheableBuffer current) { + if (current == tail) { + tail = current.prev; + } else { + current.next.prev = current.prev; + } + if (current.prev != null) { + current.prev.next = current.next; + } + current.prev = current.next = null; + return tail; + } + @Override public long evictSomeBlocks(long memoryToReserve) { @@ -190,7 +255,7 @@ public long evictSomeBlocks(long memoryToReserve) { long time = timer.get(); while (evicted < memoryToReserve) { LlapCacheableBuffer buffer = null; - synchronized (heap) { + synchronized (heapLock) { buffer = evictFromHeapUnderLock(time); } if (buffer == null) return evicted; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index e1e8a32aa6..2fffeb876e 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -102,7 +102,7 @@ private final LowLevelCache dataCache; private final BufferUsageManager bufferManager; private final Configuration daemonConf; - private LowLevelCachePolicy cachePolicyWrapper; + private final LowLevelCacheMemoryManager memoryManager; private List debugDumpComponents = new ArrayList<>(); @@ -147,17 +147,18 @@ private LlapIoImpl(Configuration conf) throws IOException { LowLevelCachePolicy realCachePolicy = useLrfu ? new LowLevelLrfuCachePolicy( minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy(); boolean trackUsage = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE); + LowLevelCachePolicy cachePolicyWrapper; if (trackUsage) { - this.cachePolicyWrapper = new CacheContentsTracker(realCachePolicy); + cachePolicyWrapper = new CacheContentsTracker(realCachePolicy); } else { - this.cachePolicyWrapper = realCachePolicy; + cachePolicyWrapper = realCachePolicy; } // Allocator uses memory manager to request memory, so create the manager next. - LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager( + this.memoryManager = new LowLevelCacheMemoryManager( totalMemorySize, cachePolicyWrapper, cacheMetrics); cacheMetrics.setCacheCapacityTotal(totalMemorySize); // Cache uses allocator to allocate and deallocate, create allocator and then caches. - BuddyAllocator allocator = new BuddyAllocator(conf, memManager, cacheMetrics); + BuddyAllocator allocator = new BuddyAllocator(conf, memoryManager, cacheMetrics); this.allocator = allocator; LowLevelCacheImpl cacheImpl = new LowLevelCacheImpl( cacheMetrics, cachePolicyWrapper, allocator, true); @@ -170,7 +171,7 @@ private LlapIoImpl(Configuration conf) throws IOException { boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE); metadataCache = new MetadataCache( - allocator, memManager, cachePolicyWrapper, useGapCache, cacheMetrics); + allocator, memoryManager, cachePolicyWrapper, useGapCache, cacheMetrics); fileMetadataCache = metadataCache; // And finally cache policy uses cache to notify it of eviction. The cycle is complete! EvictionDispatcher e = new EvictionDispatcher( @@ -198,6 +199,7 @@ private LlapIoImpl(Configuration conf) throws IOException { SimpleBufferManager sbm = new SimpleBufferManager(allocator, cacheMetrics); bufferManager = bufferManagerOrc = bufferManagerGeneric = sbm; dataCache = sbm; + this.memoryManager = null; debugDumpComponents.add(new LlapIoDebugDump() { @Override public void debugDumpShort(StringBuilder sb) { @@ -234,8 +236,8 @@ public String getMemoryInfo() { @Override public long purge() { - if (cachePolicyWrapper != null) { - return cachePolicyWrapper.purge(); + if (memoryManager != null) { + return memoryManager.purge(); } return 0; } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java index 99841aa380..6eb2eb5089 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java @@ -179,6 +179,46 @@ public void testLruExtreme() { } @Test + public void testPurge() { + final int HEAP_SIZE = 32; + Configuration conf = new Configuration(); + conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.2f); + EvictionTracker et = new EvictionTracker(); + LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, HEAP_SIZE, conf); + MetricsMock m = createMetricsMock(); + LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager( + HEAP_SIZE, lrfu, m.metricsMock); + lrfu.setEvictionListener(et); + assertEquals(0, lrfu.purge()); + for (int testSize = 1; testSize <= HEAP_SIZE; ++testSize) { + LOG.info("Starting with " + testSize); + ArrayList purge = new ArrayList(testSize), + dontPurge = new ArrayList(testSize); + for (int i = 0; i < testSize; ++i) { + LlapDataBuffer buffer = LowLevelCacheImpl.allocateFake(); + assertTrue(cache(mm, lrfu, et, buffer)); + // Lock a few blocks without telling the policy. + if ((i + 1) % 3 == 0) { + buffer.incRef(); + dontPurge.add(buffer); + } else { + purge.add(buffer); + } + } + lrfu.purge(); + for (LlapDataBuffer buffer : purge) { + assertTrue(buffer + " " + testSize, buffer.isInvalid()); + mm.releaseMemory(buffer.getMemoryUsage()); + } + for (LlapDataBuffer buffer : dontPurge) { + assertFalse(buffer.isInvalid()); + buffer.decRef(); + mm.releaseMemory(buffer.getMemoryUsage()); + } + } + } + + @Test public void testDeadlockResolution() { int heapSize = 4; LOG.info("Testing deadlock resolution");