diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8aaf5467fe..91622533ba 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -359,6 +359,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal llapDaemonVarsSetLocal.add(ConfVars.LLAP_ALLOCATOR_DIRECT.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_USE_LRFU.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_LRFU_LAMBDA.varname); + llapDaemonVarsSetLocal.add(ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_IO_USE_FILEID_PATH.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_IO_DECODING_METRICS_PERCENTILE_INTERVALS.varname); @@ -4119,12 +4120,15 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "partitions and tables) for reporting."), LLAP_USE_LRFU("hive.llap.io.use.lrfu", true, "Whether ORC low-level cache should use LRFU cache policy instead of default (FIFO)."), - LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.000001f, + LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.1f, "Lambda for ORC low-level cache LRFU cache policy. Must be in [0, 1]. 0 makes LRFU\n" + "behave like LFU, 1 makes it behave like LRU, values in between balance accordingly.\n" + "The meaning of this parameter is the inverse of the number of time ticks (cache\n" + " operations, currently) that cause the combined recency-frequency of a block in cache\n" + " to be halved."), + LLAP_LRFU_BP_WRAPPER_SIZE("hive.llap.io.lrfu.bp.wrapper.size", 64, "thread local queue " + + "used to amortize the lock contention, the idea hear is to try locking as soon we reach max size / 2 " + + "and block when max queue size reached"), LLAP_CACHE_ALLOW_SYNTHETIC_FILEID("hive.llap.cache.allow.synthetic.fileid", true, "Whether LLAP cache should use synthetic file ID if real one is not available. Systems\n" + "like HDFS, Isilon, etc. provide a unique file/inode ID. On other FSes (e.g. local\n" + diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java index 759819da40..5e0ba607ac 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -42,7 +43,7 @@ * that Subsumes the Least Recently Used (LRU) and Least Frequently Used (LFU) Policies". * Additionally, buffer locking has to be handled (locked buffer cannot be evicted). */ -public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy { +public final class LowLevelLrfuCachePolicy implements LowLevelCachePolicy { private final double lambda; private double f(long x) { return Math.pow(0.5, lambda * x); @@ -67,7 +68,7 @@ private double expirePriority(long time, long lastAccess, double previous) { * ONLY LIST REMOVAL is allowed under list lock. */ private LlapCacheableBuffer[] heap; - private final Object heapLock = new Object(); + private final ReentrantLock heapLock = new ReentrantLock(); private final ReentrantLock listLock = new ReentrantLock(); private LlapCacheableBuffer listHead, listTail; /** Number of elements. */ @@ -75,9 +76,15 @@ private double expirePriority(long time, long lastAccess, double previous) { private final int maxHeapSize; private EvictionListener evictionListener; private final PolicyMetrics metrics; + private final ThreadLocal threadLocalBuffers; + private final ThreadLocal threadLocalCount; + private final int maxQueueSize; public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, Configuration conf) { - lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA); + + this.maxQueueSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE); + this.lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA); + int maxBuffers = (int)Math.ceil((maxSize * 1.0) / minBufferSize); if (lambda == 0) { maxHeapSize = maxBuffers; // lrfuThreshold is +inf in this case @@ -99,8 +106,9 @@ public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, Configuration co // register new metrics provider for this cache policy metrics = new PolicyMetrics(sessID); - LlapMetricsSystem.instance().register("LowLevelLrfuCachePolicy-" + MetricsUtils.getHostName(), - null, metrics); + LlapMetricsSystem.instance().register("LowLevelLrfuCachePolicy-" + MetricsUtils.getHostName(), null, metrics); + threadLocalBuffers = ThreadLocal.withInitial(() -> new LlapCacheableBuffer[maxQueueSize]); + threadLocalCount = ThreadLocal.withInitial(() -> 0); } @Override @@ -140,55 +148,80 @@ public void notifyLock(LlapCacheableBuffer buffer) { @Override public void notifyUnlock(LlapCacheableBuffer buffer) { - long time = timer.incrementAndGet(); - if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) { - LlapIoImpl.CACHE_LOGGER.trace("Touching {} at {}", buffer, time); - } - synchronized (heapLock) { - // First, update buffer priority - we have just been using it. - buffer.priority = (buffer.lastUpdate == -1) ? F0 - : touchPriority(time, buffer.lastUpdate, buffer.priority); - buffer.lastUpdate = time; - // Then, if the buffer was in the list, remove it. - if (buffer.indexInHeap == LlapCacheableBuffer.IN_LIST) { - listLock.lock(); - removeFromListAndUnlock(buffer); - } - // The only concurrent change that can happen when we hold the heap lock is list removal; - // we have just ensured the item is not in the list, so we have a definite state now. - if (buffer.indexInHeap >= 0) { - // The buffer has lived in the heap all along. Restore heap property. - heapifyDownUnderLock(buffer, time); - } else if (heapSize == heap.length) { - // The buffer is not in the (full) heap. Demote the top item of the heap into the list. - LlapCacheableBuffer demoted = heap[0]; - listLock.lock(); - try { - assert demoted.indexInHeap == 0; // Noone could have moved it, we have the heap lock. - demoted.indexInHeap = LlapCacheableBuffer.IN_LIST; - demoted.prev = null; - if (listHead != null) { - demoted.next = listHead; - listHead.prev = demoted; - listHead = demoted; - } else { - listHead = demoted; - listTail = demoted; - demoted.next = null; + + int count = threadLocalCount.get(); + if (count < maxQueueSize) { + threadLocalBuffers.get()[count] = buffer; + threadLocalCount.set(++count); + } + if (count <= maxQueueSize / 2) { + // case too early to flush + return; + } + + if (count == maxQueueSize) { + // case we have to flush thus block on heap lock + heapLock.lock(); + } else if (!heapLock.tryLock()) { + // try to block if not keep going + return; + } + try { + for (int i = 0; i < count; i++) { + buffer = threadLocalBuffers.get()[i]; + long time = timer.incrementAndGet(); + if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) { + LlapIoImpl.CACHE_LOGGER.trace("Touching {} at {}", buffer, time); + } + // First, update buffer priority - we have just been using it. + buffer.priority = (buffer.lastUpdate == -1) ? F0 + : touchPriority(time, buffer.lastUpdate, buffer.priority); + buffer.lastUpdate = time; + // Then, if the buffer was in the list, remove it. + if (buffer.indexInHeap == LlapCacheableBuffer.IN_LIST) { + listLock.lock(); + removeFromListAndUnlock(buffer); + } + // The only concurrent change that can happen when we hold the heap lock is list removal; + // we have just ensured the item is not in the list, so we have a definite state now. + if (buffer.indexInHeap >= 0) { + // The buffer has lived in the heap all along. Restore heap property. + heapifyDownUnderLock(buffer, time); + } else if (heapSize == heap.length) { + // The buffer is not in the (full) heap. Demote the top item of the heap into the list. + LlapCacheableBuffer demoted = heap[0]; + listLock.lock(); + try { + assert demoted.indexInHeap == 0; // Noone could have moved it, we have the heap lock. + demoted.indexInHeap = LlapCacheableBuffer.IN_LIST; + demoted.prev = null; + if (listHead != null) { + demoted.next = listHead; + listHead.prev = demoted; + listHead = demoted; + } else { + listHead = demoted; + listTail = demoted; + demoted.next = null; + } + } finally { + listLock.unlock(); } - } finally { - listLock.unlock(); + // Now insert the new buffer in its place and restore heap property. + buffer.indexInHeap = 0; + heapifyDownUnderLock(buffer, time); + } else { + // Heap is not full, add the buffer to the heap and restore heap property up. + assert heapSize < heap.length : heap.length + " < " + heapSize; + buffer.indexInHeap = heapSize; + heapifyUpUnderLock(buffer, time); + ++heapSize; } - // Now insert the new buffer in its place and restore heap property. - buffer.indexInHeap = 0; - heapifyDownUnderLock(buffer, time); - } else { - // Heap is not full, add the buffer to the heap and restore heap property up. - assert heapSize < heap.length : heap.length + " < " + heapSize; - buffer.indexInHeap = heapSize; - heapifyUpUnderLock(buffer, time); - ++heapSize; } + + } finally { + threadLocalCount.set(0); + heapLock.unlock(); } } @@ -225,7 +258,8 @@ public long purge() { LlapCacheableBuffer[] oldHeap = null; int oldHeapSize = -1; - synchronized (heapLock) { + heapLock.lock(); + try { oldHeap = heap; oldHeapSize = heapSize; heap = new LlapCacheableBuffer[maxHeapSize]; @@ -238,6 +272,8 @@ public long purge() { oldHeap[i] = null; // Removed from heap without evicting. } } + } finally { + heapLock.unlock(); } LlapCacheableBuffer current = oldTail; while (current != null) { @@ -286,8 +322,11 @@ public long evictSomeBlocks(long memoryToReserve) { long time = timer.get(); while (evicted < memoryToReserve) { LlapCacheableBuffer buffer = null; - synchronized (heapLock) { + heapLock.lock(); + try { buffer = evictFromHeapUnderLock(time); + } finally { + heapLock.unlock(); } if (buffer == null) { return evicted; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index 0d9077c368..fadefa20bc 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -144,9 +144,10 @@ private LlapIoImpl(Configuration conf) throws IOException { // Memory manager uses cache policy to trigger evictions, so create the policy first. boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU); long totalMemorySize = HiveConf.getSizeVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE); - int minAllocSize = (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC); - LowLevelCachePolicy realCachePolicy = useLrfu ? new LowLevelLrfuCachePolicy( - minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy(); + int minAllocSize = (int) HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC); + LowLevelCachePolicy + realCachePolicy = + useLrfu ? new LowLevelLrfuCachePolicy(minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy(); boolean trackUsage = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE); LowLevelCachePolicy cachePolicyWrapper; if (trackUsage) { diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java index f4d9057cc2..d7f02d43bf 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java @@ -57,6 +57,7 @@ Configuration conf = new Configuration(); // Set lambda to 1 so the heap size becomes 1 (LRU). conf.setDouble(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f); + conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1); int minBufferSize = 1; cachePolicy = new LowLevelLrfuCachePolicy(minBufferSize, maxSize, conf); memoryManager = new LowLevelCacheMemoryManager(maxSize, cachePolicy, CACHE_METRICS); diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java index 923042d88c..fbe58ff919 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java @@ -57,6 +57,7 @@ public void testRegression_HIVE_12178() throws Exception { Configuration conf = new Configuration(); // Set lambda to 1 so the heap size becomes 1 (LRU). conf.setDouble(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f); + conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1); final LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, memSize, conf); Field f = LowLevelLrfuCachePolicy.class.getDeclaredField("listLock"); f.setAccessible(true); @@ -127,6 +128,7 @@ public void testLfuExtreme() { Configuration conf = new Configuration(); ArrayList inserted = new ArrayList(heapSize); conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.0f); + conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1); EvictionTracker et = new EvictionTracker(); LowLevelLrfuCachePolicy lfu = new LowLevelLrfuCachePolicy(1, heapSize, conf); LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(heapSize, lfu, @@ -157,6 +159,7 @@ public void testLruExtreme() { Configuration conf = new Configuration(); ArrayList inserted = new ArrayList(heapSize); conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f); + conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1); EvictionTracker et = new EvictionTracker(); LowLevelLrfuCachePolicy lru = new LowLevelLrfuCachePolicy(1, heapSize, conf); LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(heapSize, lru, @@ -183,6 +186,7 @@ public void testPurge() { final int HEAP_SIZE = 32; Configuration conf = new Configuration(); conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.2f); + conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1); EvictionTracker et = new EvictionTracker(); LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, HEAP_SIZE, conf); MetricsMock m = createMetricsMock(); @@ -225,6 +229,7 @@ public void testDeadlockResolution() { ArrayList inserted = new ArrayList(heapSize); EvictionTracker et = new EvictionTracker(); Configuration conf = new Configuration(); + conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1); LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, heapSize, conf); LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(heapSize, lrfu, LlapDaemonCacheMetrics.create("test", "1")); @@ -302,6 +307,7 @@ private void testHeapSize(int heapSize) { Configuration conf = new Configuration(); conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.2f); // very small heap, 14 elements EvictionTracker et = new EvictionTracker(); + conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1); LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, heapSize, conf); MetricsMock m = createMetricsMock(); LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(heapSize, lrfu, m.metricsMock);