diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java index 759819da40fa286b29c4efc21178be21f21f486b..b1da4230733d75876aafc30bf6951e15f2e1c974 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java @@ -204,21 +204,33 @@ public long purge() { listLock.lock(); try { LlapCacheableBuffer current = listTail; + LlapCacheableBuffer lockedHead = null; + LlapCacheableBuffer lockedTail = null; oldTail = listTail; while (current != null) { - boolean canEvict = LlapCacheableBuffer.INVALIDATE_OK == current.invalidate(); + int invalidateResult = current.invalidate(); current.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE; - if (canEvict) { + if (invalidateResult == LlapCacheableBuffer.INVALIDATE_OK) { current = current.prev; } else { - // Remove from the list. LlapCacheableBuffer newCurrent = current.prev; oldTail = removeFromLocalList(oldTail, current); + if (invalidateResult == LlapCacheableBuffer.INVALIDATE_FAILED) { + if (lockedHead != null) { + current.next = lockedHead; + lockedHead.prev = current; + lockedHead = current; + } else { + lockedHead = current; + lockedTail = current; + current.next = null; + } + } current = newCurrent; } } - listHead = null; - listTail = null; + listHead = lockedHead; + listTail = lockedTail; } finally { listLock.unlock(); } @@ -235,7 +247,12 @@ public long purge() { result.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE; int invalidateResult = result.invalidate(); if (invalidateResult != LlapCacheableBuffer.INVALIDATE_OK) { - oldHeap[i] = null; // Removed from heap without evicting. + oldHeap[i] = null; // Either can't be evicted or was already evicted. + if (invalidateResult == LlapCacheableBuffer.INVALIDATE_FAILED) { + result.indexInHeap = heapSize; + heapifyUpUnderLock(result, result.lastUpdate); + ++heapSize; + } } } } diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java index 923042d88c465af700ec58e1464ecc8ff3c3fcab..97e4b200521ffe6ae2054bb6b0a329338fa142f7 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java @@ -205,16 +205,30 @@ public void testPurge() { purge.add(buffer); } } + // Only unlocked buffers should be evicted upon this purge invocation: lrfu.purge(); + // Due to the nature of test we have to release evicted memory manually. + // Normally this would be triggered by eviction. for (LlapDataBuffer buffer : purge) { assertTrue(buffer + " " + testSize, buffer.isInvalid()); mm.releaseMemory(buffer.getMemoryUsage()); } + + + // Now unlock the buffers that remained. for (LlapDataBuffer buffer : dontPurge) { assertFalse(buffer.isInvalid()); buffer.decRef(); + } + // The 2nd purge call should find the newly unlocked buffers and evict them. + lrfu.purge(); + // Manual release... + for (LlapDataBuffer buffer : dontPurge) { mm.releaseMemory(buffer.getMemoryUsage()); } + // All test buffers should have been evicted by now. + assertEquals("Some buffers remained un-evicted!", testSize, et.evicted.size()); + et.evicted.clear(); } }