diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java index 704f2f14d3..759819da40 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java @@ -27,6 +27,15 @@ import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapMetadataBuffer; +import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; +import org.apache.hadoop.hive.llap.metrics.MetricsUtils; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.impl.MsInfo; /** * Implementation of the algorithm from "On the Existence of a Spectrum of Policies @@ -35,14 +44,14 @@ */ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy { private final double lambda; - private final double f(long x) { + private double f(long x) { return Math.pow(0.5, lambda * x); } private static final double F0 = 1; // f(0) is always 1 - private final double touchPriority(long time, long lastAccess, double previous) { + private double touchPriority(long time, long lastAccess, double previous) { return F0 + f(time - lastAccess) * previous; } - private final double expirePriority(long time, long lastAccess, double previous) { + private double expirePriority(long time, long lastAccess, double previous) { return f(time - lastAccess) * previous; } @@ -65,6 +74,7 @@ private final double expirePriority(long time, long lastAccess, double previous) private int heapSize = 0; private final int maxHeapSize; private EvictionListener evictionListener; + private final PolicyMetrics metrics; public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, Configuration conf) { lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA); @@ -79,7 +89,18 @@ public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, Configuration co minBufferSize, lambda, maxHeapSize); heap = new LlapCacheableBuffer[maxHeapSize]; - listHead = listTail = null; + listHead = null; + listTail = null; + + String sessID = conf.get("llap.daemon.metrics.sessionid"); + if (null == sessID) { + sessID = ""; + } + + // register new metrics provider for this cache policy + metrics = new PolicyMetrics(sessID); + LlapMetricsSystem.instance().register("LowLevelLrfuCachePolicy-" + MetricsUtils.getHostName(), + null, metrics); } @Override @@ -110,8 +131,10 @@ public void notifyLock(LlapCacheableBuffer buffer) { // a locked item in either, it will remove it from cache; when we unlock, we are going to // put it back or update it, depending on whether this has happened. This should cause // most of the expensive cache update work to happen in unlock, not blocking processing. - if (buffer.indexInHeap != LlapCacheableBuffer.IN_LIST) return; - if (!listLock.tryLock()) return; + if (buffer.indexInHeap != LlapCacheableBuffer.IN_LIST || !listLock.tryLock()) { + return; + } + removeFromListAndUnlock(buffer); } @@ -149,7 +172,8 @@ public void notifyUnlock(LlapCacheableBuffer buffer) { listHead.prev = demoted; listHead = demoted; } else { - listHead = listTail = demoted; + listHead = demoted; + listTail = demoted; demoted.next = null; } } finally { @@ -179,7 +203,8 @@ public long purge() { LlapCacheableBuffer oldTail = null; listLock.lock(); try { - LlapCacheableBuffer current = oldTail = listTail; + LlapCacheableBuffer current = listTail; + oldTail = listTail; while (current != null) { boolean canEvict = LlapCacheableBuffer.INVALIDATE_OK == current.invalidate(); current.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE; @@ -192,7 +217,8 @@ public long purge() { current = newCurrent; } } - listHead = listTail = null; + listHead = null; + listTail = null; } finally { listLock.unlock(); } @@ -221,7 +247,9 @@ public long purge() { } for (int i = 0; i < oldHeapSize; ++i) { current = oldHeap[i]; - if (current == null) continue; + if (current == null) { + continue; + } evicted += current.getMemoryUsage(); evictionListener.notifyEvicted(current); } @@ -240,7 +268,8 @@ private static LlapCacheableBuffer removeFromLocalList( if (current.prev != null) { current.prev.next = current.next; } - current.prev = current.next = null; + current.prev = null; + current.next = null; return tail; } @@ -249,7 +278,9 @@ private static LlapCacheableBuffer removeFromLocalList( public long evictSomeBlocks(long memoryToReserve) { // In normal case, we evict the items from the list. long evicted = evictFromList(memoryToReserve); - if (evicted >= memoryToReserve) return evicted; + if (evicted >= memoryToReserve) { + return evicted; + } // This should not happen unless we are evicting a lot at once, or buffers are large (so // there's a small number of buffers and they all live in the heap). long time = timer.get(); @@ -258,7 +289,9 @@ public long evictSomeBlocks(long memoryToReserve) { synchronized (heapLock) { buffer = evictFromHeapUnderLock(time); } - if (buffer == null) return evicted; + if (buffer == null) { + return evicted; + } evicted += buffer.getMemoryUsage(); evictionListener.notifyEvicted(buffer); } @@ -273,7 +306,8 @@ private long evictFromList(long memoryToReserve) { // Therefore we always evict one contiguous sequence from the tail. We can find it in one pass, // splice it out and then finalize the eviction outside of the list lock. try { - nextCandidate = firstCandidate = listTail; + nextCandidate = listTail; + firstCandidate = listTail; while (evicted < memoryToReserve && nextCandidate != null) { if (LlapCacheableBuffer.INVALIDATE_OK != nextCandidate.invalidate()) { // Locked, or invalidated, buffer was in the list - just drop it; @@ -293,7 +327,8 @@ private long evictFromList(long memoryToReserve) { } if (firstCandidate != nextCandidate) { if (nextCandidate == null) { - listHead = listTail = null; // We have evicted the entire list. + listHead = null; + listTail = null; // We have evicted the entire list. } else { // Splice the section that we have evicted out of the list. // We have already updated the state above so no need to do that again. @@ -313,9 +348,13 @@ private long evictFromList(long memoryToReserve) { // Note: rarely called (unless buffers are very large or we evict a lot, or in LFU case). private LlapCacheableBuffer evictFromHeapUnderLock(long time) { while (true) { - if (heapSize == 0) return null; + if (heapSize == 0) { + return null; + } LlapCacheableBuffer result = evictHeapElementUnderLock(time, 0); - if (result != null) return result; + if (result != null) { + return result; + } } } @@ -324,11 +363,15 @@ private void heapifyUpUnderLock(LlapCacheableBuffer buffer, long time) { int ix = buffer.indexInHeap; double priority = buffer.priority; while (true) { - if (ix == 0) break; // Buffer is at the top of the heap. + if (ix == 0) { + break; // Buffer is at the top of the heap. + } int parentIx = (ix - 1) >>> 1; LlapCacheableBuffer parent = heap[parentIx]; double parentPri = getHeapifyPriority(parent, time); - if (priority >= parentPri) break; + if (priority >= parentPri) { + break; + } heap[ix] = parent; parent.indexInHeap = ix; ix = parentIx; @@ -369,7 +412,9 @@ private void heapifyDownUnderLock(LlapCacheableBuffer buffer, long time) { double priority = buffer.priority; while (true) { int newIx = moveMinChildUp(ix, time, priority); - if (newIx == -1) break; + if (newIx == -1) { + break; + } ix = newIx; } buffer.indexInHeap = ix; @@ -384,7 +429,9 @@ private void heapifyDownUnderLock(LlapCacheableBuffer buffer, long time) { */ private int moveMinChildUp(int targetPos, long time, double comparePri) { int leftIx = (targetPos << 1) + 1, rightIx = leftIx + 1; - if (leftIx >= heapSize) return -1; // Buffer is at the leaf node. + if (leftIx >= heapSize) { + return -1; // Buffer is at the leaf node. + } LlapCacheableBuffer left = heap[leftIx], right = null; if (rightIx < heapSize) { right = heap[rightIx]; @@ -405,7 +452,9 @@ private int moveMinChildUp(int targetPos, long time, double comparePri) { } private double getHeapifyPriority(LlapCacheableBuffer buf, long time) { - if (buf == null) return Double.MAX_VALUE; + if (buf == null) { + return Double.MAX_VALUE; + } if (buf.lastUpdate != time && time >= 0) { buf.priority = expirePriority(time, buf.lastUpdate, buf.priority); buf.lastUpdate = time; @@ -415,7 +464,9 @@ private double getHeapifyPriority(LlapCacheableBuffer buf, long time) { private void removeFromListAndUnlock(LlapCacheableBuffer buffer) { try { - if (buffer.indexInHeap != LlapCacheableBuffer.IN_LIST) return; + if (buffer.indexInHeap != LlapCacheableBuffer.IN_LIST) { + return; + } removeFromListUnderLock(buffer); } finally { listLock.unlock(); @@ -547,20 +598,184 @@ private static void dumpList(StringBuilder result, @Override public void debugDumpShort(StringBuilder sb) { - sb.append("\nLRFU eviction list: "); - LlapCacheableBuffer listHeadLocal = listHead, listTailLocal = listTail; - if (listHeadLocal == null) { - sb.append("0 items"); - } else { - LlapCacheableBuffer listItem = listHeadLocal; - int c = 0; - while (listItem != null) { - ++c; - if (listItem == listTailLocal) break; - listItem = listItem.next; + long[] metricData = metrics.getUsageStats(); + sb.append("\nLRFU eviction list: ") + .append(metricData[PolicyMetrics.LISTSIZE]).append(" items"); + sb.append("\nLRFU eviction heap: ") + .append(heapSize).append(" items (of max ").append(maxHeapSize).append(")"); + sb.append("\nLRFU data on heap: ") + .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.DATAONHEAP])); + sb.append("\nLRFU metadata on heap: ") + .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.METAONHEAP])); + sb.append("\nLRFU data on eviction list: ") + .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.DATAONLIST])); + sb.append("\nLRFU metadata on eviction list: ") + .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.METAONLIST])); + sb.append("\nLRFU data locked: ") + .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.LOCKEDDATA])); + sb.append("\nLRFU metadata locked: ") + .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.LOCKEDMETA])); + } + + /** + * Metrics Information for LRFU specific policy information. + * This enumeration is used by the @code PolicyMetrics instance to + * define and describe the metrics. + */ + private enum PolicyInformation implements MetricsInfo { + PolicyMetrics("LRFU cache policy based metrics"), + DataOnHeap("Amount of bytes used for data on min-heap"), + DataOnList("Amount of bytes used for data on eviction short list"), + MetaOnHeap("Amount of bytes used for meta data on min-heap"), + MetaOnList("Amount of bytes used for meta data on eviction short list"), + DataLocked("Amount of locked data in bytes (in use)"), + MetaLocked("Amount of locked meta data in bytes (in use)"), + HeapSize("Number of buffers on the min-heap"), + HeapSizeMax("Capacity (number of buffers) of the min-heap"), + ListSize("Number of buffers on the eviction short list"), + TotalData("Total amount of bytes, used for data"), + TotalMeta("Total amount of bytes, used for meta data"); + + private final String description; // metric explaination + + /** + * Creates a new enum value. + * + * @param description The explaination of the metric + */ + PolicyInformation(String description) { + this.description = description; + } + + @Override + public String description() { + return description; + } + } + + /** + * Metrics provider for the LRFU cache policy. + * An instance of this class is providing JMX (through haddoop metrics) + * statistics for the LRFU cache policy for monitoring. + */ + @Metrics(about = "LRFU Cache Policy Metrics", context = "cache") + private class PolicyMetrics implements MetricsSource { + public static final int DATAONHEAP = 0; + public static final int DATAONLIST = 1; + public static final int METAONHEAP = 2; + public static final int METAONLIST = 3; + public static final int LISTSIZE = 4; + public static final int LOCKEDDATA = 5; + public static final int LOCKEDMETA = 6; + + private final String session; // identifier for the LLAP daemon + + /** + * Creates a new metrics producer. + * + * @param session The LLAP daemon identifier + */ + PolicyMetrics(String session) { + this.session = session; + } + + /** + * Helper to get some basic LRFU usage statistics. + * This method returns a long array with the following content: + * - amount of data (bytes) on min-heap + * - amount of data (bytes) on eviction short list + * - amount of metadata (bytes) on min-heap + * - amount of metadata (bytes) on eviction short list + * - size of the eviction short list + * - amount of locked bytes for data + * - amount of locked bytes for metadata + * + * @return long array with LRFU stats + */ + public long[] getUsageStats() { + long dataOnHeap = 0L; // all non-meta related buffers on min-heap + long dataOnList = 0L; // all non-meta related buffers on eviction list + long metaOnHeap = 0L; // meta data buffers on min-heap + long metaOnList = 0L; // meta data buffers on eviction list + long listSize = 0L; // number of entries on eviction list + long lockedData = 0L; // number of bytes in locked data buffers + long lockedMeta = 0L; // number of bytes in locked metadata buffers + + // aggregate values on the heap + synchronized (heapLock) { + for (int heapIdx = 0; heapIdx < heapSize; ++heapIdx) { + LlapCacheableBuffer buff = heap[heapIdx]; + + if (null != buff) { + if (buff instanceof LlapMetadataBuffer) { + metaOnHeap += buff.getMemoryUsage(); + if (buff.isLocked()) { + lockedMeta += buff.getMemoryUsage(); + } + } else { + dataOnHeap += buff.getMemoryUsage(); + if (buff.isLocked()) { + lockedData += buff.getMemoryUsage(); + } + } + } + } } - sb.append(c + " items"); + + // aggregate values on the evicition short list + try { + listLock.lock(); + LlapCacheableBuffer scan = listHead; + while (null != scan) { + if (scan instanceof LlapMetadataBuffer) { + metaOnList += scan.getMemoryUsage(); + if (scan.isLocked()) { + lockedMeta += scan.getMemoryUsage(); + } + } else { + dataOnList += scan.getMemoryUsage(); + if (scan.isLocked()) { + lockedData += scan.getMemoryUsage(); + } + } + + ++listSize; + scan = scan.next; + } + } finally { + listLock.unlock(); + } + + return new long[] {dataOnHeap, dataOnList, + metaOnHeap, metaOnList, listSize, + lockedData, lockedMeta}; + } + + @Override + public synchronized void getMetrics(MetricsCollector collector, boolean all) { + long[] usageStats = getUsageStats(); + + // start a new record + MetricsRecordBuilder mrb = collector.addRecord(PolicyInformation.PolicyMetrics) + .setContext("cache") + .tag(MsInfo.ProcessName, + MetricsUtils.METRICS_PROCESS_NAME) + .tag(MsInfo.SessionId, session); + + // add the values to the new record + mrb.addCounter(PolicyInformation.DataOnHeap, usageStats[DATAONHEAP]) + .addCounter(PolicyInformation.DataOnList, usageStats[DATAONLIST]) + .addCounter(PolicyInformation.MetaOnHeap, usageStats[METAONHEAP]) + .addCounter(PolicyInformation.MetaOnList, usageStats[METAONLIST]) + .addCounter(PolicyInformation.DataLocked, usageStats[LOCKEDDATA]) + .addCounter(PolicyInformation.MetaLocked, usageStats[LOCKEDMETA]) + .addCounter(PolicyInformation.HeapSize, heapSize) + .addCounter(PolicyInformation.HeapSizeMax, maxHeapSize) + .addCounter(PolicyInformation.ListSize, usageStats[LISTSIZE]) + .addCounter(PolicyInformation.TotalData, usageStats[DATAONHEAP] + + usageStats[DATAONLIST]) + .addCounter(PolicyInformation.TotalMeta, usageStats[METAONHEAP] + + usageStats[METAONLIST]); } - sb.append("\nLRFU eviction heap: " + heapSize + " items"); } }