diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8aaf5467fe..91622533ba 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -359,6 +359,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal llapDaemonVarsSetLocal.add(ConfVars.LLAP_ALLOCATOR_DIRECT.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_USE_LRFU.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_LRFU_LAMBDA.varname); + llapDaemonVarsSetLocal.add(ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_IO_USE_FILEID_PATH.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_IO_DECODING_METRICS_PERCENTILE_INTERVALS.varname); @@ -4119,12 +4120,15 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "partitions and tables) for reporting."), LLAP_USE_LRFU("hive.llap.io.use.lrfu", true, "Whether ORC low-level cache should use LRFU cache policy instead of default (FIFO)."), - LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.000001f, + LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.1f, "Lambda for ORC low-level cache LRFU cache policy. Must be in [0, 1]. 0 makes LRFU\n" + "behave like LFU, 1 makes it behave like LRU, values in between balance accordingly.\n" + "The meaning of this parameter is the inverse of the number of time ticks (cache\n" + " operations, currently) that cause the combined recency-frequency of a block in cache\n" + " to be halved."), + LLAP_LRFU_BP_WRAPPER_SIZE("hive.llap.io.lrfu.bp.wrapper.size", 64, "thread local queue " + + "used to amortize the lock contention, the idea hear is to try locking as soon we reach max size / 2 " + + "and block when max queue size reached"), LLAP_CACHE_ALLOW_SYNTHETIC_FILEID("hive.llap.cache.allow.synthetic.fileid", true, "Whether LLAP cache should use synthetic file ID if real one is not available. Systems\n" + "like HDFS, Isilon, etc. provide a unique file/inode ID. On other FSes (e.g. local\n" + diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java index 759819da40..d1d6acd398 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java @@ -42,7 +42,7 @@ * that Subsumes the Least Recently Used (LRU) and Least Frequently Used (LFU) Policies". * Additionally, buffer locking has to be handled (locked buffer cannot be evicted). */ -public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy { +public final class LowLevelLrfuCachePolicy implements LowLevelCachePolicy { private final double lambda; private double f(long x) { return Math.pow(0.5, lambda * x); @@ -67,7 +67,7 @@ private double expirePriority(long time, long lastAccess, double previous) { * ONLY LIST REMOVAL is allowed under list lock. */ private LlapCacheableBuffer[] heap; - private final Object heapLock = new Object(); + private final ReentrantLock heapLock = new ReentrantLock(); private final ReentrantLock listLock = new ReentrantLock(); private LlapCacheableBuffer listHead, listTail; /** Number of elements. */ @@ -75,9 +75,15 @@ private double expirePriority(long time, long lastAccess, double previous) { private final int maxHeapSize; private EvictionListener evictionListener; private final PolicyMetrics metrics; + private final ThreadLocal threadLocalBuffers; + private final ThreadLocal threadLocalCount; + private final int maxQueueSize; public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, Configuration conf) { - lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA); + + this.maxQueueSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE); + this.lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA); + int maxBuffers = (int)Math.ceil((maxSize * 1.0) / minBufferSize); if (lambda == 0) { maxHeapSize = maxBuffers; // lrfuThreshold is +inf in this case @@ -99,8 +105,9 @@ public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, Configuration co // register new metrics provider for this cache policy metrics = new PolicyMetrics(sessID); - LlapMetricsSystem.instance().register("LowLevelLrfuCachePolicy-" + MetricsUtils.getHostName(), - null, metrics); + LlapMetricsSystem.instance().register("LowLevelLrfuCachePolicy-" + MetricsUtils.getHostName(), null, metrics); + threadLocalBuffers = ThreadLocal.withInitial(() -> new LlapCacheableBuffer[maxQueueSize]); + threadLocalCount = ThreadLocal.withInitial(() -> 0); } @Override @@ -140,11 +147,47 @@ public void notifyLock(LlapCacheableBuffer buffer) { @Override public void notifyUnlock(LlapCacheableBuffer buffer) { - long time = timer.incrementAndGet(); - if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) { - LlapIoImpl.CACHE_LOGGER.trace("Touching {} at {}", buffer, time); + + int count = threadLocalCount.get(); + final LlapCacheableBuffer[] cacheableBuffers = threadLocalBuffers.get() ; + if (count < maxQueueSize) { + cacheableBuffers[count] = buffer; + threadLocalCount.set(++count); + } + if (count <= maxQueueSize / 2) { + // case too early to flush + return; + } + + if (count == maxQueueSize) { + // case we have to flush thus block on heap lock + heapLock.lock(); + try { + doNotifyUnderHeapLock(count, cacheableBuffers); + } finally { + threadLocalCount.set(0); + heapLock.unlock(); + } + return; } - synchronized (heapLock) { + if (heapLock.tryLock()) { + try { + doNotifyUnderHeapLock(count, cacheableBuffers); + } finally { + threadLocalCount.set(0); + heapLock.unlock(); + } + } + } + + private void doNotifyUnderHeapLock(int count, LlapCacheableBuffer[] cacheableBuffers) { + LlapCacheableBuffer buffer; + for (int i = 0; i < count; i++) { + buffer = cacheableBuffers[i]; + long time = timer.incrementAndGet(); + if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) { + LlapIoImpl.CACHE_LOGGER.trace("Touching {} at {}", buffer, time); + } // First, update buffer priority - we have just been using it. buffer.priority = (buffer.lastUpdate == -1) ? F0 : touchPriority(time, buffer.lastUpdate, buffer.priority); @@ -200,7 +243,7 @@ public void setEvictionListener(EvictionListener listener) { @Override public long purge() { long evicted = 0; - LlapCacheableBuffer oldTail = null; + LlapCacheableBuffer oldTail; listLock.lock(); try { LlapCacheableBuffer current = listTail; @@ -223,9 +266,10 @@ public long purge() { listLock.unlock(); } - LlapCacheableBuffer[] oldHeap = null; - int oldHeapSize = -1; - synchronized (heapLock) { + LlapCacheableBuffer[] oldHeap; + int oldHeapSize; + heapLock.lock(); + try { oldHeap = heap; oldHeapSize = heapSize; heap = new LlapCacheableBuffer[maxHeapSize]; @@ -238,6 +282,8 @@ public long purge() { oldHeap[i] = null; // Removed from heap without evicting. } } + } finally { + heapLock.unlock(); } LlapCacheableBuffer current = oldTail; while (current != null) { @@ -285,9 +331,12 @@ public long evictSomeBlocks(long memoryToReserve) { // there's a small number of buffers and they all live in the heap). long time = timer.get(); while (evicted < memoryToReserve) { - LlapCacheableBuffer buffer = null; - synchronized (heapLock) { + LlapCacheableBuffer buffer; + heapLock.lock(); + try { buffer = evictFromHeapUnderLock(time); + } finally { + heapLock.unlock(); } if (buffer == null) { return evicted; @@ -300,7 +349,7 @@ public long evictSomeBlocks(long memoryToReserve) { private long evictFromList(long memoryToReserve) { long evicted = 0; - LlapCacheableBuffer nextCandidate = null, firstCandidate = null; + LlapCacheableBuffer nextCandidate, firstCandidate; listLock.lock(); // We assume that there are no locked blocks in the list; or if they are, they can be dropped. // Therefore we always evict one contiguous sequence from the tail. We can find it in one pass, @@ -702,7 +751,8 @@ public String description() { long lockedMeta = 0L; // number of bytes in locked metadata buffers // aggregate values on the heap - synchronized (heapLock) { + heapLock.lock(); + try { for (int heapIdx = 0; heapIdx < heapSize; ++heapIdx) { LlapCacheableBuffer buff = heap[heapIdx]; @@ -720,6 +770,8 @@ public String description() { } } } + } finally { + heapLock.unlock(); } // aggregate values on the evicition short list diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index 0d9077c368..fadefa20bc 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -144,9 +144,10 @@ private LlapIoImpl(Configuration conf) throws IOException { // Memory manager uses cache policy to trigger evictions, so create the policy first. boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU); long totalMemorySize = HiveConf.getSizeVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE); - int minAllocSize = (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC); - LowLevelCachePolicy realCachePolicy = useLrfu ? new LowLevelLrfuCachePolicy( - minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy(); + int minAllocSize = (int) HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC); + LowLevelCachePolicy + realCachePolicy = + useLrfu ? new LowLevelLrfuCachePolicy(minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy(); boolean trackUsage = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE); LowLevelCachePolicy cachePolicyWrapper; if (trackUsage) { diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java index f4d9057cc2..d7f02d43bf 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java @@ -57,6 +57,7 @@ Configuration conf = new Configuration(); // Set lambda to 1 so the heap size becomes 1 (LRU). conf.setDouble(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f); + conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1); int minBufferSize = 1; cachePolicy = new LowLevelLrfuCachePolicy(minBufferSize, maxSize, conf); memoryManager = new LowLevelCacheMemoryManager(maxSize, cachePolicy, CACHE_METRICS); diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java index 923042d88c..fbe58ff919 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java @@ -57,6 +57,7 @@ public void testRegression_HIVE_12178() throws Exception { Configuration conf = new Configuration(); // Set lambda to 1 so the heap size becomes 1 (LRU). conf.setDouble(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f); + conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1); final LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, memSize, conf); Field f = LowLevelLrfuCachePolicy.class.getDeclaredField("listLock"); f.setAccessible(true); @@ -127,6 +128,7 @@ public void testLfuExtreme() { Configuration conf = new Configuration(); ArrayList inserted = new ArrayList(heapSize); conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.0f); + conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1); EvictionTracker et = new EvictionTracker(); LowLevelLrfuCachePolicy lfu = new LowLevelLrfuCachePolicy(1, heapSize, conf); LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(heapSize, lfu, @@ -157,6 +159,7 @@ public void testLruExtreme() { Configuration conf = new Configuration(); ArrayList inserted = new ArrayList(heapSize); conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f); + conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1); EvictionTracker et = new EvictionTracker(); LowLevelLrfuCachePolicy lru = new LowLevelLrfuCachePolicy(1, heapSize, conf); LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(heapSize, lru, @@ -183,6 +186,7 @@ public void testPurge() { final int HEAP_SIZE = 32; Configuration conf = new Configuration(); conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.2f); + conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1); EvictionTracker et = new EvictionTracker(); LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, HEAP_SIZE, conf); MetricsMock m = createMetricsMock(); @@ -225,6 +229,7 @@ public void testDeadlockResolution() { ArrayList inserted = new ArrayList(heapSize); EvictionTracker et = new EvictionTracker(); Configuration conf = new Configuration(); + conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1); LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, heapSize, conf); LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(heapSize, lrfu, LlapDaemonCacheMetrics.create("test", "1")); @@ -302,6 +307,7 @@ private void testHeapSize(int heapSize) { Configuration conf = new Configuration(); conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.2f); // very small heap, 14 elements EvictionTracker et = new EvictionTracker(); + conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1); LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, heapSize, conf); MetricsMock m = createMetricsMock(); LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(heapSize, lrfu, m.metricsMock);