diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java index 704f2f14d3..b28f957e55 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java @@ -27,6 +27,15 @@ import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapMetadataBuffer; +import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; +import org.apache.hadoop.hive.llap.metrics.MetricsUtils; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.impl.MsInfo; /** * Implementation of the algorithm from "On the Existence of a Spectrum of Policies @@ -65,6 +74,7 @@ private final double expirePriority(long time, long lastAccess, double previous) private int heapSize = 0; private final int maxHeapSize; private EvictionListener evictionListener; + private final PolicyMetrics metrics; public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, Configuration conf) { lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA); @@ -80,10 +90,25 @@ public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, Configuration co heap = new LlapCacheableBuffer[maxHeapSize]; listHead = listTail = null; + + String sessID = conf.get("llap.daemon.metrics.sessionid"); + if (null == sessID) { + sessID = ""; + } + + // register new metrics provider for this cache policy + LlapMetricsSystem.instance().register("LowLevelLrfuCachePolicy-"+ MetricsUtils.getHostName(), + null, metrics = new PolicyMetrics(sessID)); } @Override public void cache(LlapCacheableBuffer buffer, Priority priority) { + // originally cached items don't get a notifyLock call, so we count there + // byte size here when they enter the cache. + if (buffer.isLocked()) { + metrics.incrementLocked(buffer); + } + // LRFU cache policy doesn't store locked blocks. When we cache, the block is locked, so // we simply do nothing here. The fact that it was never updated will allow us to add it // properly on the first notifyUnlock. @@ -105,18 +130,26 @@ public void cache(LlapCacheableBuffer buffer, Priority priority) { @Override public void notifyLock(LlapCacheableBuffer buffer) { + // add to locked counts in policy metrics + metrics.incrementLocked(buffer); + // We do not proactively remove locked items from the heap, and opportunistically try to // remove from the list (since eviction is mostly from the list). If eviction stumbles upon // a locked item in either, it will remove it from cache; when we unlock, we are going to // put it back or update it, depending on whether this has happened. This should cause // most of the expensive cache update work to happen in unlock, not blocking processing. - if (buffer.indexInHeap != LlapCacheableBuffer.IN_LIST) return; - if (!listLock.tryLock()) return; + if (buffer.indexInHeap != LlapCacheableBuffer.IN_LIST) + return; + if (!listLock.tryLock()) + return; removeFromListAndUnlock(buffer); } @Override public void notifyUnlock(LlapCacheableBuffer buffer) { + // decrement value in policy metrics + metrics.decrementLocked(buffer); + long time = timer.incrementAndGet(); if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) { LlapIoImpl.CACHE_LOGGER.trace("Touching {} at {}", buffer, time); @@ -563,4 +596,150 @@ public void debugDumpShort(StringBuilder sb) { } sb.append("\nLRFU eviction heap: " + heapSize + " items"); } + + /** + * Metrics Information for LRFU specific policy information. + * This enumeration is used by the @code PolicyMetrics instance to + * define and describe the metrics. + */ + private enum PolicyInformation implements MetricsInfo { + PolicyMetrics("LRFU cache policy based metrics"), + DataOnHeap("Amount of bytes used for data on min-heap"), + DataOnList("Amount of bytes used for data on eviction short list"), + MetaOnHeap("Amount of bytes used for meta data on min-heap"), + MetaOnList("Amount of bytes used for meta data on eviction short list"), + DataLocked("Amount of locked data in bytes (in use)"), + MetaLocked("Amount of locked meta data in bytes (in use)"), + HeapSize("Number of buffers on the min-heap"), + HeapSizeMax("Capacity (number of buffers) of the min-heap"), + ListSize("Number of buffers on the eviction short list"), + TotalData("Total amount of bytes, used for data"), + TotalMeta("Total amount of bytes, used for meta data"); + + private final String description; // metric explaination + + /** + * Creates a new enum value. + * + * @param description The explaination of the metric + */ + PolicyInformation(String description) { + this.description = description; + } + + @Override + public String description() { + return description; + } + } + + /** + * Metrics provider for the LRFU cache policy. + * An instance of this class is providing JMX (through haddoop metrics) + * statistics for the LRFU cache policy for monitoring. + */ + @Metrics(about = "LRFU Cache Policy Metrics", context = "cache") + private class PolicyMetrics implements MetricsSource { + private final String session; // identifier for the LLAP daemon + private final AtomicLong lockedData; // counter for locked data buffers + private final AtomicLong lockedMeta; // counter for locked meta data buffers + + /** + * Creates a new metrics producer. + * + * @param session The LLAP daemon identifier + */ + public PolicyMetrics(String session) { + this.session = session; + lockedData = new AtomicLong(0); + lockedMeta = new AtomicLong(0); + } + + /** + * Increments the locked byte count for the specified buffer category. + * + * @param buff The locked buffer + */ + public void incrementLocked(LlapCacheableBuffer buff) { + if (buff instanceof LlapMetadataBuffer) { + lockedMeta.addAndGet(buff.getMemoryUsage()); + } else { + lockedData.addAndGet(buff.getMemoryUsage()); + } + } + + /** + * Decrements the locked byte count for the specified buffer category. + * + * @param buff The locked buffer + */ + public void decrementLocked(LlapCacheableBuffer buff) { + if (buff instanceof LlapMetadataBuffer) { + lockedMeta.addAndGet(-buff.getMemoryUsage()); + } else { + lockedData.addAndGet(-buff.getMemoryUsage()); + } + } + + @Override + public synchronized void getMetrics(MetricsCollector collector, boolean all) { + long dataOnHeap = 0L; // all non-meta related buffers on min-heap + long dataOnList = 0L; // all non-meta related buffers on eviction list + long metaOnHeap = 0L; // meta data buffers on min-heap + long metaOnList = 0L; // meta data buffers on eviction list + long listSize = 0L; // number of entries on eviction list + + // start a new record + MetricsRecordBuilder mrb = collector.addRecord(PolicyInformation.PolicyMetrics) + .setContext("cache") + .tag(MsInfo.ProcessName, + MetricsUtils.METRICS_PROCESS_NAME) + .tag(MsInfo.SessionId, session); + // aggregate values on the heap + synchronized (heapLock) { + for (int heapIdx = 0; heapIdx < heapSize; ++heapIdx) { + LlapCacheableBuffer buff = heap[heapIdx]; + + if (null != buff) { + if (buff instanceof LlapMetadataBuffer) { + metaOnHeap += buff.getMemoryUsage(); + } else { + dataOnHeap += buff.getMemoryUsage(); + } + } + } + } + + // aggregate values on the evicition short list + try { + listLock.lock(); + LlapCacheableBuffer scan = listHead; + while (null != scan) { + if (scan instanceof LlapMetadataBuffer) { + metaOnList += scan.getMemoryUsage(); + } else { + dataOnList += scan.getMemoryUsage(); + } + + ++listSize; + scan = scan.next; + } + } finally { + listLock.unlock(); + } + + // add the values to the new record + mrb.addCounter(PolicyInformation.DataOnHeap, dataOnHeap) + .addCounter(PolicyInformation.DataOnList, dataOnList) + .addCounter(PolicyInformation.MetaOnHeap, metaOnHeap) + .addCounter(PolicyInformation.MetaOnList, metaOnList) + .addCounter(PolicyInformation.DataLocked, lockedData.get()) + .addCounter(PolicyInformation.MetaLocked, lockedMeta.get()) + .addCounter(PolicyInformation.HeapSize, heapSize) + .addCounter(PolicyInformation.HeapSizeMax, maxHeapSize) + .addCounter(PolicyInformation.ListSize, listSize) + .addCounter(PolicyInformation.TotalData, dataOnHeap + dataOnList) + .addCounter(PolicyInformation.TotalMeta, metaOnHeap + metaOnList); + } + } }