diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java index 704f2f14d3..537d986add 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java @@ -27,6 +27,15 @@ import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapMetadataBuffer; +import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; +import org.apache.hadoop.hive.llap.metrics.MetricsUtils; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.impl.MsInfo; /** * Implementation of the algorithm from "On the Existence of a Spectrum of Policies @@ -65,6 +74,7 @@ private final double expirePriority(long time, long lastAccess, double previous) private int heapSize = 0; private final int maxHeapSize; private EvictionListener evictionListener; + private final PolicyMetrics metrics; public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, Configuration conf) { lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA); @@ -80,10 +90,25 @@ public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, Configuration co heap = new LlapCacheableBuffer[maxHeapSize]; listHead = listTail = null; + + String sessID = conf.get("llap.daemon.metrics.sessionid"); + if (null == sessID) { + sessID = ""; + } + + // register new metrics provider for this cache policy + LlapMetricsSystem.instance().register("LowLevelLrfuCachePolicy-"+ MetricsUtils.getHostName(), + null, metrics = new PolicyMetrics(sessID)); } @Override public void cache(LlapCacheableBuffer buffer, Priority priority) { + // originally cached items don't get a notifyLock call, so we count there + // byte size here when they enter the cache. + if (buffer.isLocked()) { + metrics.incrementLocked(buffer); + } + // LRFU cache policy doesn't store locked blocks. When we cache, the block is locked, so // we simply do nothing here. The fact that it was never updated will allow us to add it // properly on the first notifyUnlock. @@ -105,18 +130,26 @@ public void cache(LlapCacheableBuffer buffer, Priority priority) { @Override public void notifyLock(LlapCacheableBuffer buffer) { + // add to locked counts in policy metrics + metrics.incrementLocked(buffer); + // We do not proactively remove locked items from the heap, and opportunistically try to // remove from the list (since eviction is mostly from the list). If eviction stumbles upon // a locked item in either, it will remove it from cache; when we unlock, we are going to // put it back or update it, depending on whether this has happened. This should cause // most of the expensive cache update work to happen in unlock, not blocking processing. - if (buffer.indexInHeap != LlapCacheableBuffer.IN_LIST) return; - if (!listLock.tryLock()) return; + if (buffer.indexInHeap != LlapCacheableBuffer.IN_LIST) + return; + if (!listLock.tryLock()) + return; removeFromListAndUnlock(buffer); } @Override public void notifyUnlock(LlapCacheableBuffer buffer) { + // decrement value in policy metrics + metrics.decrementLocked(buffer); + long time = timer.incrementAndGet(); if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) { LlapIoImpl.CACHE_LOGGER.trace("Touching {} at {}", buffer, time); @@ -547,20 +580,200 @@ private static void dumpList(StringBuilder result, @Override public void debugDumpShort(StringBuilder sb) { - sb.append("\nLRFU eviction list: "); - LlapCacheableBuffer listHeadLocal = listHead, listTailLocal = listTail; - if (listHeadLocal == null) { - sb.append("0 items"); - } else { - LlapCacheableBuffer listItem = listHeadLocal; - int c = 0; - while (listItem != null) { - ++c; - if (listItem == listTailLocal) break; - listItem = listItem.next; + long[] metricData = metrics.getUsageStats(); + sb.append("\nLRFU eviction list: ") + .append(metricData[PolicyMetrics.LISTSIZE]).append(" items"); + sb.append("\nLRFU eviction heap: ") + .append(heapSize).append(" items (of max ").append(maxHeapSize).append(")"); + sb.append("\nLRFU data on heap: ") + .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.DATAONHEAP])); + sb.append("\nLRFU metadata on heap: ") + .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.METAONHEAP])); + sb.append("\nLRFU data on eviction list: ") + .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.DATAONLIST])); + sb.append("\nLRFU metadata on eviction list: ") + .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.METAONLIST])); + sb.append("\nLRFU data locked: ") + .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.LOCKEDDATA])); + sb.append("\nLRFU metadata locked: ") + .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.LOCKEDMETA])); + } + + /** + * Metrics Information for LRFU specific policy information. + * This enumeration is used by the @code PolicyMetrics instance to + * define and describe the metrics. + */ + private enum PolicyInformation implements MetricsInfo { + PolicyMetrics("LRFU cache policy based metrics"), + DataOnHeap("Amount of bytes used for data on min-heap"), + DataOnList("Amount of bytes used for data on eviction short list"), + MetaOnHeap("Amount of bytes used for meta data on min-heap"), + MetaOnList("Amount of bytes used for meta data on eviction short list"), + DataLocked("Amount of locked data in bytes (in use)"), + MetaLocked("Amount of locked meta data in bytes (in use)"), + HeapSize("Number of buffers on the min-heap"), + HeapSizeMax("Capacity (number of buffers) of the min-heap"), + ListSize("Number of buffers on the eviction short list"), + TotalData("Total amount of bytes, used for data"), + TotalMeta("Total amount of bytes, used for meta data"); + + private final String description; // metric explaination + + /** + * Creates a new enum value. + * + * @param description The explaination of the metric + */ + PolicyInformation(String description) { + this.description = description; + } + + @Override + public String description() { + return description; + } + } + + /** + * Metrics provider for the LRFU cache policy. + * An instance of this class is providing JMX (through haddoop metrics) + * statistics for the LRFU cache policy for monitoring. + */ + @Metrics(about = "LRFU Cache Policy Metrics", context = "cache") + private class PolicyMetrics implements MetricsSource { + public final static int DATAONHEAP = 0; + public final static int DATAONLIST = 1; + public final static int METAONHEAP = 2; + public final static int METAONLIST = 3; + public final static int LISTSIZE = 4; + public final static int LOCKEDDATA = 5; + public final static int LOCKEDMETA = 6; + + private final String session; // identifier for the LLAP daemon + private final AtomicLong lockedData; // counter for locked data bytes + private final AtomicLong lockedMeta; // counter for locked meta bytes + + /** + * Creates a new metrics producer. + * + * @param session The LLAP daemon identifier + */ + public PolicyMetrics(String session) { + this.session = session; + lockedData = new AtomicLong(0); + lockedMeta = new AtomicLong(0); + } + + /** + * Increments the locked byte count for the specified buffer category. + * + * @param buff The locked buffer + */ + public void incrementLocked(LlapCacheableBuffer buff) { + if (buff instanceof LlapMetadataBuffer) { + lockedMeta.addAndGet(buff.getMemoryUsage()); + } else { + lockedData.addAndGet(buff.getMemoryUsage()); + } + } + + /** + * Decrements the locked byte count for the specified buffer category. + * + * @param buff The locked buffer + */ + public void decrementLocked(LlapCacheableBuffer buff) { + if (buff instanceof LlapMetadataBuffer) { + lockedMeta.addAndGet(-buff.getMemoryUsage()); + } else { + lockedData.addAndGet(-buff.getMemoryUsage()); + } + } + + /** + * Helper to get some basic LRFU usage statistics. + * This method returns a long array with the following content: + * - amount of data (bytes) on min-heap + * - amount of data (bytes) on eviction short list + * - amount of metadata (bytes) on min-heap + * - amount of metadata (bytes) on eviction short list + * - size of the eviction short list + * - amount of locked bytes for data + * - amount of locked bytes for metadata + * + * @return long array with LRFU stats + */ + public long[] getUsageStats() { + long dataOnHeap = 0L; // all non-meta related buffers on min-heap + long dataOnList = 0L; // all non-meta related buffers on eviction list + long metaOnHeap = 0L; // meta data buffers on min-heap + long metaOnList = 0L; // meta data buffers on eviction list + long listSize = 0L; // number of entries on eviction list + + // aggregate values on the heap + synchronized (heapLock) { + for (int heapIdx = 0; heapIdx < heapSize; ++heapIdx) { + LlapCacheableBuffer buff = heap[heapIdx]; + + if (null != buff) { + if (buff instanceof LlapMetadataBuffer) { + metaOnHeap += buff.getMemoryUsage(); + } else { + dataOnHeap += buff.getMemoryUsage(); + } + } + } } - sb.append(c + " items"); + + // aggregate values on the evicition short list + try { + listLock.lock(); + LlapCacheableBuffer scan = listHead; + while (null != scan) { + if (scan instanceof LlapMetadataBuffer) { + metaOnList += scan.getMemoryUsage(); + } else { + dataOnList += scan.getMemoryUsage(); + } + + ++listSize; + scan = scan.next; + } + } finally { + listLock.unlock(); + } + + return new long[] {dataOnHeap, dataOnList, + metaOnHeap, metaOnList, listSize, + lockedData.get(), lockedMeta.get()}; + } + + @Override + public synchronized void getMetrics(MetricsCollector collector, boolean all) { + long[] usageStats = getUsageStats(); + + // start a new record + MetricsRecordBuilder mrb = collector.addRecord(PolicyInformation.PolicyMetrics) + .setContext("cache") + .tag(MsInfo.ProcessName, + MetricsUtils.METRICS_PROCESS_NAME) + .tag(MsInfo.SessionId, session); + + // add the values to the new record + mrb.addCounter(PolicyInformation.DataOnHeap, usageStats[DATAONHEAP]) + .addCounter(PolicyInformation.DataOnList, usageStats[DATAONLIST]) + .addCounter(PolicyInformation.MetaOnHeap, usageStats[METAONHEAP]) + .addCounter(PolicyInformation.MetaOnList, usageStats[METAONLIST]) + .addCounter(PolicyInformation.DataLocked, usageStats[LOCKEDDATA]) + .addCounter(PolicyInformation.MetaLocked, usageStats[LOCKEDMETA]) + .addCounter(PolicyInformation.HeapSize, heapSize) + .addCounter(PolicyInformation.HeapSizeMax, maxHeapSize) + .addCounter(PolicyInformation.ListSize, usageStats[LISTSIZE]) + .addCounter(PolicyInformation.TotalData, usageStats[DATAONHEAP] + + usageStats[DATAONLIST]) + .addCounter(PolicyInformation.TotalMeta, usageStats[METAONHEAP] + + usageStats[METAONLIST]); } - sb.append("\nLRFU eviction heap: " + heapSize + " items"); } }