diff --git llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java index dc8484b..d82757f 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java @@ -23,4 +23,5 @@ public interface LlapIo { InputFormat getInputFormat(InputFormat sourceInputFormat); + void close(); } diff --git llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java index 9bfce38..c48af7b 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java @@ -69,4 +69,10 @@ private static LlapIo createIoImpl(Configuration conf) throws IOException { throw new RuntimeException("Failed to create impl class", e); } } + + public static void close() { + if (io != null) { + io.close(); + } + } } diff --git llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java index 4ac83ba..2ee3e36 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java @@ -20,15 +20,19 @@ import java.nio.ByteBuffer; +import org.apache.hadoop.metrics2.MetricsSource; + public abstract class LlapMemoryBuffer { protected LlapMemoryBuffer() { } - protected void initialize(ByteBuffer byteBuffer, int offset, int length) { + protected void initialize(ByteBuffer byteBuffer, int offset, int length, MetricsSource metrics) { this.byteBuffer = byteBuffer.slice(); this.byteBuffer.position(offset); this.byteBuffer.limit(offset + length); + this.metrics = metrics; } /** Note - position/limit of this should NOT be modified after it's in cache. We could add a wrapper to enforce that, but for now it's shared and should be duplicated. */ public ByteBuffer byteBuffer; + public MetricsSource metrics; } \ No newline at end of file diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java index fcd9ff4..fac23cb 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java @@ -26,8 +26,9 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; -public final class BuddyAllocator implements Allocator { +public final class BuddyAllocator implements Allocator, BuddyAllocatorMXBean { private final Arena[] arenas; private AtomicInteger allocatedArenas = new AtomicInteger(0); @@ -38,8 +39,10 @@ private final int minAllocation, maxAllocation, arenaSize; private final long maxSize; private final boolean isDirect; + private final LlapDaemonCacheMetrics metrics; - public BuddyAllocator(Configuration conf, MemoryManager memoryManager) { + public BuddyAllocator(Configuration conf, MemoryManager memoryManager, + LlapDaemonCacheMetrics metrics) { isDirect = HiveConf.getBoolVar(conf, ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT); minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC); maxAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_ALLOC); @@ -79,6 +82,9 @@ public BuddyAllocator(Configuration conf, MemoryManager memoryManager) { arenas[0].init(); allocatedArenas.set(1); this.memoryManager = memoryManager; + + this.metrics = metrics; + metrics.incrAllocatedArena(); } // TODO: would it make sense to return buffers asynchronously? @@ -136,6 +142,7 @@ public boolean allocateMultiple(LlapMemoryBuffer[] dest, int size) { @Override public void deallocate(LlapMemoryBuffer buffer) { LlapCacheableBuffer buf = (LlapCacheableBuffer)buffer; + metrics.decrCacheCapacityUsed(buf.byteBuffer.capacity()); arenas[buf.arenaIndex].deallocate(buf); } @@ -154,6 +161,32 @@ public String debugDump() { return result.toString(); } + // BuddyAllocatorMXBean + @Override + public boolean getIsDirect() { + return isDirect; + } + + @Override + public int getMinAllocation() { + return minAllocation; + } + + @Override + public int getMaxAllocation() { + return maxAllocation; + } + + @Override + public int getArenaSize() { + return arenaSize; + } + + @Override + public long getMaxCacheSize() { + return maxSize; + } + private class Arena { private ByteBuffer data; // Avoid storing headers with data since we expect binary size allocations. @@ -260,7 +293,7 @@ private int allocateWithSplit(int arenaIx, int freeListIx, lastSplitBlocksRemaining = splitWays - toTake; for (; toTake > 0; ++ix, --toTake, headerIx += headerStep, offset += allocationSize) { headers[headerIx] = headerData; - ((LlapCacheableBuffer)dest[ix]).initialize(arenaIx, data, offset, allocationSize); + ((LlapCacheableBuffer)dest[ix]).initialize(arenaIx, data, offset, allocationSize, metrics); } lastSplitNextHeader = headerIx; headerIx = data.getInt(origOffset + 4); @@ -312,6 +345,7 @@ private int allocateWithExpand( if (data == null) { init(); allocatedArenas.incrementAndGet(); + metrics.incrAllocatedArena(); } } } @@ -330,7 +364,7 @@ public int allocateFromFreeListUnderLock(int arenaIx, FreeList freeList, // Noone else has this either allocated or in a different free list; no sync needed. headers[current] = makeHeader(freeListIx, true); current = data.getInt(offset + 4); - ((LlapCacheableBuffer)dest[ix]).initialize(arenaIx, data, offset, size); + ((LlapCacheableBuffer)dest[ix]).initialize(arenaIx, data, offset, size, metrics); ++ix; } replaceListHeadUnderLock(freeList, current); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocatorMXBean.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocatorMXBean.java new file mode 100644 index 0000000..c7c1326 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocatorMXBean.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.cache; + +import javax.management.MXBean; + +/** + * MXbean to expose cache allocator related information through JMX. + */ +@MXBean +public interface BuddyAllocatorMXBean { + + /** + * Gets if bytebuffers are allocated directly offheap. + * + * @return gets if direct bytebuffer allocation + */ + public boolean getIsDirect(); + + /** + * Gets minimum allocation size of allocator. + * + * @return minimum allocation size + */ + public int getMinAllocation(); + + /** + * Gets maximum allocation size of allocator. + * + * @return maximum allocation size + */ + public int getMaxAllocation(); + + /** + * Gets the arena size. + * + * @return arena size + */ + public int getArenaSize(); + + /** + * Gets the maximum cache size. + * + * @return max cache size + */ + public long getMaxCacheSize(); +} \ No newline at end of file diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java index 7b9a77a..6f5265a 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.llap.DebugUtils; import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; import com.google.common.annotations.VisibleForTesting; @@ -31,8 +32,9 @@ private static final int EVICTED_REFCOUNT = -1; static final int IN_LIST = -2, NOT_IN_CACHE = -1; - public void initialize(int arenaIndex, ByteBuffer byteBuffer, int offset, int length) { - super.initialize(byteBuffer, offset, length); + public void initialize(int arenaIndex, ByteBuffer byteBuffer, int offset, int length, + LlapDaemonCacheMetrics metrics) { + super.initialize(byteBuffer, offset, length, metrics); this.arenaIndex = arenaIndex; } @@ -82,6 +84,7 @@ int incRef() { if (DebugUtils.isTraceLockingEnabled()) { LlapIoImpl.LOG.info("Locked " + this + "; new ref count " + newRefCount); } + ((LlapDaemonCacheMetrics)metrics).incrCacheNumLockedBuffers(); return newRefCount; } @@ -103,6 +106,7 @@ int decRef() { if (newRefCount < 0) { throw new AssertionError("Unexpected refCount " + newRefCount + ": " + this); } + ((LlapDaemonCacheMetrics)metrics).decrCacheNumLockedBuffers(); return newRefCount; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java index a691a68..4ddfafa 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java @@ -19,15 +19,12 @@ import java.nio.ByteBuffer; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; -import java.util.ListIterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.common.DiskRangeList; import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper; @@ -35,27 +32,29 @@ import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk; import com.google.common.annotations.VisibleForTesting; public class LowLevelCacheImpl implements LowLevelCache, EvictionListener { + private static final int DEFAULT_CLEANUP_INTERVAL = 600; private final Allocator allocator; - private AtomicInteger newEvictions = new AtomicInteger(0); private Thread cleanupThread = null; private final ConcurrentHashMap cache = new ConcurrentHashMap(); private final LowLevelCachePolicy cachePolicy; private final long cleanupInterval; + private LlapDaemonCacheMetrics metrics; - public LowLevelCacheImpl( - Configuration conf, LowLevelCachePolicy cachePolicy, Allocator allocator) { - this(conf, cachePolicy, allocator, 600); + public LowLevelCacheImpl(LlapDaemonCacheMetrics metrics, LowLevelCachePolicy cachePolicy, + Allocator allocator) { + this(metrics, cachePolicy, allocator, DEFAULT_CLEANUP_INTERVAL); } @VisibleForTesting - LowLevelCacheImpl(Configuration conf, + LowLevelCacheImpl(LlapDaemonCacheMetrics metrics, LowLevelCachePolicy cachePolicy, Allocator allocator, long cleanupInterval) { if (LlapIoImpl.LOGL.isInfoEnabled()) { LlapIoImpl.LOG.info("Low level cache; cleanup interval " + cleanupInterval + "sec"); @@ -63,6 +62,7 @@ public LowLevelCacheImpl( this.cachePolicy = cachePolicy; this.allocator = allocator; this.cleanupInterval = cleanupInterval; + this.metrics = metrics; } public void init() { @@ -79,6 +79,7 @@ public void allocateMultiple(LlapMemoryBuffer[] dest, int size) { @Override public DiskRangeList getFileData(long fileId, DiskRangeList ranges, long baseOffset) { if (ranges == null) return null; + metrics.incrCacheRequestedBytes(ranges.getLength()); FileCache subCache = cache.get(fileId); if (subCache == null || !subCache.incRef()) return ranges; try { @@ -129,6 +130,7 @@ private void getOverlappingRanges(long baseOffset, DiskRangeList currentNotCache currentNotCached = addCachedBufferToIter(currentNotCached, currentCached, baseOffset); // Now that we've added it into correct position, we can adjust it by base offset. currentCached.shiftBy(-baseOffset); + metrics.incrCacheHitBytes(currentCached.getLength()); } } @@ -296,7 +298,7 @@ public void releaseBufferInternal(LlapCacheableBuffer buffer) { private static final ByteBuffer fakeBuf = ByteBuffer.wrap(new byte[1]); public static LlapCacheableBuffer allocateFake() { LlapCacheableBuffer fake = new LlapCacheableBuffer(); - fake.initialize(-1, fakeBuf, 0, 1); + fake.initialize(-1, fakeBuf, 0, 1, null); return fake; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java index 6ea129b..4cfe55f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java @@ -24,16 +24,21 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; public class LowLevelCacheMemoryManager implements MemoryManager { private final AtomicLong usedMemory; protected final long maxSize; private final LowLevelCachePolicy evictor; + private LlapDaemonCacheMetrics metrics; - public LowLevelCacheMemoryManager(Configuration conf, LowLevelCachePolicy evictor) { + public LowLevelCacheMemoryManager(Configuration conf, LowLevelCachePolicy evictor, + LlapDaemonCacheMetrics metrics) { this.maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE); this.evictor = evictor; this.usedMemory = new AtomicLong(0); + this.metrics = metrics; + metrics.incrCacheCapacityTotal(maxSize); if (LlapIoImpl.LOGL.isInfoEnabled()) { LlapIoImpl.LOG.info("Cache memory manager initialized with max size " + maxSize); } @@ -61,6 +66,7 @@ public boolean reserveMemory(long memoryToReserve, boolean waitForEviction) { usedMem = usedMemory.get(); } } + metrics.incrCacheCapacityUsed(memoryToReserve); return true; } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 4b43d41..33dd63e 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -25,17 +25,14 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; -import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.CallableWithNdc; +import org.apache.hadoop.hive.llap.daemon.ContainerRunner; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RunContainerRequestProto; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; +import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; @@ -46,16 +43,21 @@ import org.apache.log4j.NDC; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; -import org.apache.hadoop.hive.common.CallableWithNdc; -import org.apache.hadoop.hive.llap.daemon.ContainerRunner; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RunContainerRequestProto; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; import org.apache.tez.runtime.task.TezChild; import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult; -import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; + +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; public class ContainerRunnerImpl extends AbstractService implements ContainerRunner { @@ -69,11 +71,12 @@ private final Map localEnv = new HashMap(); private volatile FileSystem localFs; private final long memoryPerExecutor; + private final LlapDaemonExecutorMetrics metrics; // TODO Support for removing queued containers, interrupting / killing specific containers public ContainerRunnerImpl(int numExecutors, String[] localDirsBase, int localShufflePort, - AtomicReference localAddress, - long totalMemoryAvailableBytes) { + AtomicReference localAddress, + long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics) { super("ContainerRunnerImpl"); Preconditions.checkState(numExecutors > 0, "Invalid number of executors: " + numExecutors + ". Must be > 0"); @@ -92,7 +95,7 @@ public ContainerRunnerImpl(int numExecutors, String[] localDirsBase, int localSh // 80% of memory considered for accounted buffers. Rest for objects. // TODO Tune this based on the available size. this.memoryPerExecutor = (long)(totalMemoryAvailableBytes * 0.8 / (float) numExecutors); - + this.metrics = metrics; LOG.info("ContainerRunnerImpl config: " + "memoryPerExecutorDerviced=" + memoryPerExecutor ); @@ -173,6 +176,8 @@ public void queueContainer(RunContainerRequestProto request) throws IOException ListenableFuture future = executorService .submit(callable); Futures.addCallback(future, new ContainerRunnerCallback(request, callable)); + metrics.incrExecutorTotalRequestsHandled(); + metrics.incrExecutorNumQueuedRequests(); } finally { NDC.pop(); } @@ -248,22 +253,27 @@ public void onSuccess(ContainerExecutionResult result) { case SUCCESS: LOG.info("Successfully finished: " + request.getApplicationIdString() + ", containerId=" + request.getContainerIdString()); + metrics.incrExecutorTotalSuccess(); break; case EXECUTION_FAILURE: LOG.info("Failed to run: " + request.getApplicationIdString() + ", containerId=" + request.getContainerIdString(), result.getThrowable()); + metrics.incrExecutorTotalExecutionFailed(); break; case INTERRUPTED: LOG.info( "Interrupted while running: " + request.getApplicationIdString() + ", containerId=" + request.getContainerIdString(), result.getThrowable()); + metrics.incrExecutorTotalInterrupted(); break; case ASKED_TO_DIE: LOG.info( "Asked to die while running: " + request.getApplicationIdString() + ", containerId=" + request.getContainerIdString()); + metrics.incrExecutorTotalAskedToDie(); break; } + metrics.decrExecutorNumQueuedRequests(); } @Override @@ -275,6 +285,7 @@ public void onFailure(Throwable t) { if (tezChild != null) { tezChild.shutdown(); } + metrics.decrExecutorNumQueuedRequests(); } } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 0cad851..e398652 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -19,17 +19,26 @@ import java.util.Arrays; import java.util.concurrent.atomic.AtomicReference; -import com.google.common.base.Preconditions; +import javax.management.ObjectName; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.AbstractService; -import org.apache.log4j.Logger; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RunContainerRequestProto; import org.apache.hadoop.hive.llap.io.api.LlapIoProxy; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; +import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; +import org.apache.hadoop.hive.llap.metrics.MetricsUtils; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; +import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.JvmPauseMonitor; +import org.apache.log4j.Logger; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; -public class LlapDaemon extends AbstractService implements ContainerRunner { +public class LlapDaemon extends AbstractService implements ContainerRunner, LlapDaemonMXBean { private static final Logger LOG = Logger.getLogger(LlapDaemon.class); @@ -40,6 +49,12 @@ private final ContainerRunnerImpl containerRunner; private final String[] localDirs; private final int shufflePort; + private final long memoryPerInstance; + private final long maxJvmMemory; + private JvmPauseMonitor pauseMonitor; + private final ObjectName llapDaemonInfoBean; + private final LlapDaemonExecutorMetrics metrics; + // TODO Not the best way to share the address private final AtomicReference address = new AtomicReference(); @@ -55,18 +70,18 @@ public LlapDaemon(LlapDaemonConfiguration daemonConf) { this.localDirs = daemonConf.getTrimmedStrings(LlapDaemonConfiguration.LLAP_DAEMON_WORK_DIRS); this.shufflePort = daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT, -1); - long memoryAvailableBytes = this.daemonConf + memoryPerInstance = this.daemonConf .getInt(LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT) * 1024l * 1024l; - long jvmMax = Runtime.getRuntime().maxMemory(); + maxJvmMemory = Runtime.getRuntime().maxMemory(); LOG.info("LlapDaemon started with the following configuration: " + "numExecutors=" + numExecutors + ", rpcListenerPort=" + rpcPort + ", workDirs=" + Arrays.toString(localDirs) + ", shufflePort=" + shufflePort + - ", memoryConfigured=" + memoryAvailableBytes + - ", jvmAvailableMemory=" + jvmMax); + ", memoryConfigured=" + memoryPerInstance + + ", jvmAvailableMemory=" + maxJvmMemory); Preconditions.checkArgument(this.numExecutors > 0); Preconditions.checkArgument(this.rpcPort > 1024 && this.rpcPort < 65536, @@ -74,13 +89,26 @@ public LlapDaemon(LlapDaemonConfiguration daemonConf) { Preconditions.checkArgument(this.localDirs != null && this.localDirs.length > 0, "Work dirs must be specified"); Preconditions.checkArgument(this.shufflePort > 0, "ShufflePort must be specified"); - Preconditions.checkState(jvmMax >= memoryAvailableBytes, - "Invalid configuration. Xmx value too small. maxAvailable=" + jvmMax + ", configured=" + - memoryAvailableBytes); + Preconditions.checkState(maxJvmMemory >= memoryPerInstance, + "Invalid configuration. Xmx value too small. maxAvailable=" + maxJvmMemory + ", configured=" + + memoryPerInstance); + + // Initialize the metric system + LlapMetricsSystem.initialize("LlapDaemon"); + this.pauseMonitor = new JvmPauseMonitor(daemonConf); + pauseMonitor.start(); + String displayName = "LlapDaemonExecutorMetrics-" + MetricsUtils.getHostName(); + String sessionId = MetricsUtils.getUUID(); + this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors); + metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); + this.llapDaemonInfoBean = MBeans.register("LlapDaemon", "LlapDaemonInfo", this); + LOG.info("Started LlapMetricsSystem with displayName: " + displayName + + " sessionId: " + sessionId); this.server = new LlapDaemonProtocolServerImpl(daemonConf, this, address); this.containerRunner = new ContainerRunnerImpl(numExecutors, localDirs, shufflePort, address, - memoryAvailableBytes); + memoryPerInstance, metrics); + } @Override @@ -95,16 +123,33 @@ public void serviceInit(Configuration conf) { public void serviceStart() { server.start(); containerRunner.start(); - } public void serviceStop() { + shutdown(); containerRunner.stop(); server.stop(); } + public void shutdown() { + LOG.info("LlapDaemon shutdown invoked"); + if (llapDaemonInfoBean != null) { + MBeans.unregister(llapDaemonInfoBean); + } + + if (pauseMonitor != null) { + pauseMonitor.stop(); + } + + if (metrics != null) { + LlapMetricsSystem.shutdown(); + } + + LlapIoProxy.close(); + } public static void main(String[] args) throws Exception { + LlapDaemon llapDaemon = null; try { LlapDaemonConfiguration daemonConf = new LlapDaemonConfiguration(); @@ -113,7 +158,7 @@ public static void main(String[] args) throws Exception { daemonConf.get(LlapDaemonConfiguration.LLAP_DAEMON_WORK_DIRS)); ShuffleHandler.initializeAndStart(shuffleHandlerConf); - LlapDaemon llapDaemon = new LlapDaemon(daemonConf); + llapDaemon = new LlapDaemon(daemonConf); llapDaemon.init(daemonConf); llapDaemon.start(); LOG.info("Started LlapDaemon"); @@ -121,13 +166,46 @@ public static void main(String[] args) throws Exception { } catch (Throwable t) { // TODO Replace this with a ExceptionHandler / ShutdownHook LOG.warn("Failed to start LLAP Daemon with exception", t); + if (llapDaemon != null) { + llapDaemon.shutdown(); + } System.exit(-1); } } - @Override public void queueContainer(RunContainerRequestProto request) throws IOException { containerRunner.queueContainer(request); } + + // LlapDaemonMXBean methods. Will be exposed via JMX + @Override + public int getRpcPort() { + return rpcPort; + } + + @Override + public int getNumExecutors() { + return numExecutors; + } + + @Override + public int getShufflePort() { + return shufflePort; + } + + @Override + public String getLocalDirs() { + return Joiner.on(",").skipNulls().join(localDirs); + } + + @Override + public long getMemoryPerInstance() { + return memoryPerInstance; + } + + @Override + public long getMaxJvmMemory() { + return maxJvmMemory; + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java new file mode 100644 index 0000000..cf1a8a4 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.impl; + +import javax.management.MXBean; + +/** + * MXbean to expose llap daemon related information through JMX. + */ +@MXBean +public interface LlapDaemonMXBean { + + /** + * Gets the rpc port. + * @return the rpc port + */ + public int getRpcPort(); + + /** + * Gets the number of executors. + * @return number of executors + */ + public int getNumExecutors(); + + /** + * Gets the shuffle port. + * @return the shuffle port + */ + public int getShufflePort(); + + /** + * CSV list of local directories + * @return local dirs + */ + public String getLocalDirs(); + + /** + * Gets llap daemon configured memory per instance. + * @return memory per instance + */ + public long getMemoryPerInstance(); + + /** + * Gets max available jvm memory. + * @return max jvm memory + */ + public long getMaxJvmMemory(); +} \ No newline at end of file diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index 8443dfe..b76db78 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.concurrent.Executors; +import javax.management.ObjectName; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -40,9 +42,14 @@ import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer; import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer; import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; +import org.apache.hadoop.hive.llap.metrics.MetricsUtils; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.util.JvmPauseMonitor; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -53,37 +60,58 @@ private final ColumnVectorProducer cvp; private final ListeningExecutorService executor; + private final Configuration conf; + private LlapDaemonCacheMetrics metrics; + private JvmPauseMonitor pauseMonitor; + private ObjectName buddyAllocatorMXBean; + private Allocator allocator; private LlapIoImpl(Configuration conf) throws IOException { + this.conf = conf; boolean useLowLevelCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_LOW_LEVEL_CACHE); // High-level cache not supported yet. if (LOGL.isInfoEnabled()) { LOG.info("Initializing LLAP IO" + (useLowLevelCache ? " with low level cache" : "")); } + + String displayName = "LlapDaemonCacheMetrics-" + MetricsUtils.getHostName(); + // TODO: Find a better way to pass in session id + String sessionId = conf.get("llap.daemon.sessionid"); + this.metrics = LlapDaemonCacheMetrics.create(displayName, sessionId); + LOG.info("Started LlapDaemonCacheMetrics with displayName: " + displayName + + " sessionId: " + sessionId); + Cache cache = useLowLevelCache ? null : new NoopCache(); - LowLevelCacheImpl orcCache = createLowLevelCache(conf, useLowLevelCache); + LowLevelCacheImpl orcCache = createLowLevelCache(conf, useLowLevelCache, metrics); OrcMetadataCache metadataCache = OrcMetadataCache.getInstance(); // Arbitrary thread pool. Listening is used for unhandled errors for now (TODO: remove?) executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); // TODO: this should depends on input format and be in a map, or something. - this.cvp = new OrcColumnVectorProducer(metadataCache, orcCache, cache, conf); + this.cvp = new OrcColumnVectorProducer(metadataCache, orcCache, cache, conf, metrics); if (LOGL.isInfoEnabled()) { LOG.info("LLAP IO initialized"); } + + registerMXBeans(); } - private LowLevelCacheImpl createLowLevelCache(Configuration conf, boolean useLowLevelCache) { + private void registerMXBeans() { + buddyAllocatorMXBean = MBeans.register("LlapDaemon", "BuddyAllocatorInfo", allocator); + } + + private LowLevelCacheImpl createLowLevelCache(Configuration conf, boolean useLowLevelCache, + LlapDaemonCacheMetrics metrics) { if (!useLowLevelCache) return null; boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU); LowLevelCachePolicy cachePolicy = useLrfu ? new LowLevelLrfuCachePolicy(conf) : new LowLevelFifoCachePolicy(conf); // Memory manager uses cache policy to trigger evictions. - LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(conf, cachePolicy); + LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(conf, cachePolicy, metrics); // Allocator uses memory manager to request memory. - Allocator allocator = new BuddyAllocator(conf, memManager); + allocator = new BuddyAllocator(conf, memManager, metrics); // Cache uses allocator to allocate and deallocate. - LowLevelCacheImpl orcCache = new LowLevelCacheImpl(conf, cachePolicy, allocator); + LowLevelCacheImpl orcCache = new LowLevelCacheImpl(metrics, cachePolicy, allocator); // And finally cache policy uses cache to notify it of eviction. The cycle is complete! cachePolicy.setEvictionListener(orcCache); orcCache.init(); @@ -96,4 +124,21 @@ private LowLevelCacheImpl createLowLevelCache(Configuration conf, boolean useLow InputFormat sourceInputFormat) { return new LlapInputFormat(sourceInputFormat, cvp, executor); } + + public LlapDaemonCacheMetrics getMetrics() { + return metrics; + } + + @Override + public void close() { + LOG.info("Closing LlapIoImpl.."); + if (pauseMonitor != null) { + pauseMonitor.stop(); + } + + if (buddyAllocatorMXBean != null) { + MBeans.unregister(buddyAllocatorMXBean); + buddyAllocatorMXBean = null; + } + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java index 7ba337a..56d3fe2 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.llap.io.api.orc.OrcCacheKey; import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader; import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.mapred.InputSplit; @@ -41,9 +42,11 @@ private final LowLevelCache lowLevelCache; private final Configuration conf; private boolean _skipCorrupt; // TODO: get rid of this + private LlapDaemonCacheMetrics metrics; public OrcColumnVectorProducer(OrcMetadataCache metadataCache, - LowLevelCacheImpl lowLevelCache, Cache cache, Configuration conf) { + LowLevelCacheImpl lowLevelCache, Cache cache, Configuration conf, + LlapDaemonCacheMetrics metrics) { if (LlapIoImpl.LOGL.isInfoEnabled()) { LlapIoImpl.LOG.info("Initializing ORC column vector producer"); } @@ -53,12 +56,14 @@ public OrcColumnVectorProducer(OrcMetadataCache metadataCache, this.cache = cache; this.conf = conf; this._skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA); + this.metrics = metrics; } @Override public ReadPipeline createReadPipeline( Consumer consumer, InputSplit split, List columnIds, SearchArgument sarg, String[] columnNames) { + metrics.incrCacheReadRequests(); OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer( consumer, columnIds.size(), _skipCorrupt); OrcEncodedDataReader reader = new OrcEncodedDataReader( diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheInfo.java new file mode 100644 index 0000000..6775985 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheInfo.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.metrics; + +import org.apache.hadoop.metrics2.MetricsInfo; + +import com.google.common.base.Objects; + +/** + * Metrics information for llap cache. + */ +public enum LlapDaemonCacheInfo implements MetricsInfo { + LLAP_DAEMON_CACHE_METRICS("Llap daemon cache related metrics"), + CACHE_CAPACITY_REMAINING("Amount of memory available in cache in bytes"), + CACHE_CAPACITY_TOTAL("Total amount of memory allocated for cache in bytes"), + CACHE_CAPACITY_USED("Amount of memory used in cache in bytes"), + CACHE_REQUESTED_BYTES("Disk ranges that are requested in bytes"), + CACHE_HIT_BYTES("Disk ranges that are cached in bytes"), + CACHE_HIT_RATIO("Ratio of disk ranges cached vs requested"), + CACHE_READ_REQUESTS("Number of disk range requests to cache"), + CACHE_ALLOCATED_ARENA("Number of arenas allocated"), + CACHE_NUM_LOCKED_BUFFERS("Number of locked buffers in cache"); + + private final String desc; + + LlapDaemonCacheInfo(String desc) { + this.desc = desc; + } + + @Override + public String description() { + return desc; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("name", name()).add("description", desc) + .toString(); + } +} \ No newline at end of file diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java new file mode 100644 index 0000000..80e78ea --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.metrics; + +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CACHE_ALLOCATED_ARENA; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CACHE_CAPACITY_REMAINING; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CACHE_CAPACITY_TOTAL; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CACHE_CAPACITY_USED; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CACHE_HIT_BYTES; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CACHE_HIT_RATIO; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CACHE_NUM_LOCKED_BUFFERS; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CACHE_READ_REQUESTS; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CACHE_REQUESTED_BYTES; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.LLAP_DAEMON_CACHE_METRICS; +import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; +import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; + +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; + +/** + * Llap daemon cache metrics source. + */ +@Metrics(about = "LlapDaemon Cache Metrics", context = "llap") +public class LlapDaemonCacheMetrics implements MetricsSource { + final String name; + // TODO: SessionId should come from llap daemon. For now using random UUID. + private String sessionId; + private final MetricsRegistry registry; + + @Metric + MutableCounterLong cacheReadRequests; + @Metric + MutableCounterLong cacheCapacityTotal; + @Metric + MutableCounterLong cacheCapacityUsed; + @Metric + MutableCounterLong cacheRequestedBytes; + @Metric + MutableCounterLong cacheHitBytes; + @Metric + MutableCounterLong cacheAllocatedArena; + @Metric + MutableCounterLong cacheNumLockedBuffers; + + private LlapDaemonCacheMetrics(String name, String sessionId) { + this.name = name; + this.sessionId = sessionId; + this.registry = new MetricsRegistry("LlapDaemonCacheRegistry"); + } + + public static LlapDaemonCacheMetrics create(String displayName, String sessionId) { + MetricsSystem ms = LlapMetricsSystem.instance(); + return ms.register(displayName, null, new LlapDaemonCacheMetrics(displayName, sessionId)); + } + + public void incrCacheCapacityTotal(long delta) { + cacheCapacityTotal.incr(delta); + } + + public void incrCacheCapacityUsed(long delta) { + cacheCapacityUsed.incr(delta); + } + + public void decrCacheCapacityUsed(int delta) { + cacheCapacityUsed.incr(-delta); + } + + public void incrCacheRequestedBytes(long delta) { + cacheRequestedBytes.incr(delta); + } + + public void incrCacheHitBytes(long delta) { + cacheHitBytes.incr(delta); + } + + public void incrCacheReadRequests() { + cacheReadRequests.incr(); + } + + public void incrAllocatedArena() { + cacheAllocatedArena.incr(); + } + + public void incrCacheNumLockedBuffers() { + cacheNumLockedBuffers.incr(); + } + + public void decrCacheNumLockedBuffers() { + cacheNumLockedBuffers.incr(-1); + } + + public String getName() { + return name; + } + + @Override + public void getMetrics(MetricsCollector collector, boolean b) { + MetricsRecordBuilder rb = collector.addRecord(LLAP_DAEMON_CACHE_METRICS) + .setContext("llap").tag(ProcessName, "LlapDaemon") + .tag(SessionId, sessionId); + getCacheStats(rb); + } + + private void getCacheStats(MetricsRecordBuilder rb) { + float cacheHitRatio = cacheRequestedBytes.value() == 0 ? 0.0f : + (float) cacheHitBytes.value() / (float) cacheRequestedBytes.value(); + + rb.addCounter(CACHE_CAPACITY_REMAINING, cacheCapacityTotal.value() - cacheCapacityUsed.value()) + .addCounter(CACHE_CAPACITY_TOTAL, cacheCapacityTotal.value()) + .addCounter(CACHE_CAPACITY_USED, cacheCapacityUsed.value()) + .addCounter(CACHE_READ_REQUESTS, cacheReadRequests.value()) + .addCounter(CACHE_REQUESTED_BYTES, cacheRequestedBytes.value()) + .addCounter(CACHE_HIT_BYTES, cacheHitBytes.value()) + .addCounter(CACHE_ALLOCATED_ARENA, cacheAllocatedArena.value()) + .addCounter(CACHE_NUM_LOCKED_BUFFERS, cacheNumLockedBuffers.value()) + .addGauge(CACHE_HIT_RATIO, cacheHitRatio); + } + +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonContainerRunnerInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonContainerRunnerInfo.java new file mode 100644 index 0000000..8e11ab1 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonContainerRunnerInfo.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.metrics; + +/** + * + */ +public class LlapDaemonContainerRunnerInfo { +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java new file mode 100644 index 0000000..b7e06d5 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.metrics; + +import org.apache.hadoop.metrics2.MetricsInfo; + +import com.google.common.base.Objects; + +/** + * Metrics information for llap daemon container. + */ +public enum LlapDaemonExecutorInfo implements MetricsInfo { + LLAP_DAEMON_EXECUTOR_METRICS("Llap daemon cache related metrics"), + EXECUTOR_THREAD_CPU_TIME("Cpu time in nanoseconds"), + EXECUTOR_THREAD_USER_TIME("User time in nanoseconds"), + EXECUTOR_THREAD_SYSTEM_TIME("System time in nanoseconds"), + EXECUTOR_TOTAL_REQUESTS_HANDLED("Total number of requests handled by the container"), + EXECUTOR_NUM_QUEUED_REQUESTS("Number of requests queued by the container for processing"), + EXECUTOR_TOTAL_SUCCESS("Total number of requests handled by the container that succeeded"), + EXECUTOR_TOTAL_EXECUTION_FAILURE("Total number of requests handled by the container that failed execution"), + EXECUTOR_TOTAL_INTERRUPTED("Total number of requests handled by the container that got interrupted"), + EXECUTOR_TOTAL_ASKED_TO_DIE("Total number of requests handled by the container that were asked to die"); + + private final String desc; + + LlapDaemonExecutorInfo(String desc) { + this.desc = desc; + } + + @Override + public String description() { + return desc; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("name", name()).add("description", desc) + .toString(); + } +} \ No newline at end of file diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java new file mode 100644 index 0000000..e376495 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.metrics; + +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.EXECUTOR_NUM_QUEUED_REQUESTS; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.EXECUTOR_TOTAL_ASKED_TO_DIE; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.EXECUTOR_TOTAL_EXECUTION_FAILURE; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.EXECUTOR_TOTAL_INTERRUPTED; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.EXECUTOR_TOTAL_REQUESTS_HANDLED; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.EXECUTOR_TOTAL_SUCCESS; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.LLAP_DAEMON_EXECUTOR_METRICS; +import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; +import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; + +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.source.JvmMetrics; + +/** + * Metrics about the llap daemon executors. + */ +@Metrics(about = "LlapDaemon Executor Metrics", context = "llap") +public class LlapDaemonExecutorMetrics implements MetricsSource { + + private final String name; + private final JvmMetrics jvmMetrics; + private final String sessionId; + private final MetricsRegistry registry; + private final int numExecutors; + + @Metric + MutableCounterLong[] executorThreadCpuTime; + @Metric + MutableCounterLong[] executorThreadUserTime; + @Metric + MutableCounterLong[] executorThreadSystemTime; + @Metric + MutableCounterLong executorTotalRequestHandled; + @Metric + MutableCounterLong executorNumQueuedRequests; + @Metric + MutableCounterLong executorTotalSuccess; + @Metric + MutableCounterLong executorTotalInterrupted; + @Metric + MutableCounterLong executorTotalExecutionFailed; + @Metric + MutableCounterLong executorTotalAskedToDie; + + private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sessionId, + int numExecutors) { + this.name = displayName; + this.jvmMetrics = jm; + this.sessionId = sessionId; + this.registry = new MetricsRegistry("LlapDaemonExecutorRegistry"); + this.numExecutors = numExecutors; + this.executorThreadCpuTime = new MutableCounterLong[numExecutors]; + this.executorThreadUserTime = new MutableCounterLong[numExecutors]; + this.executorThreadSystemTime = new MutableCounterLong[numExecutors]; + } + + public static LlapDaemonExecutorMetrics create(String displayName, String sessionId, + int numExecutors) { + MetricsSystem ms = LlapMetricsSystem.instance(); + JvmMetrics jm = JvmMetrics.create("LlapDaemon", sessionId, ms); + return ms.register(displayName, "LlapDaemon Executor Metrics", + new LlapDaemonExecutorMetrics(displayName, jm, sessionId, numExecutors)); + } + + @Override + public void getMetrics(MetricsCollector collector, boolean b) { + MetricsRecordBuilder rb = collector.addRecord(LLAP_DAEMON_EXECUTOR_METRICS) + .setContext("llap").tag(ProcessName, "LlapDaemon") + .tag(SessionId, sessionId); + getExecutorStats(rb); + } + + // Assumption here is threadId is from 0 to numExecutors - 1 + public void incrExecutorThreadCpuTime(int threadId, int delta) { + executorThreadCpuTime[threadId].incr(delta); + } + + public void incrExecutorThreadUserTime(int threadId, int delta) { + executorThreadUserTime[threadId].incr(delta); + } + + public void incrExecutorThreadSystemTime(int threadId, int delta) { + executorThreadSystemTime[threadId].incr(delta); + } + + public void incrExecutorTotalRequestsHandled() { + executorTotalRequestHandled.incr(); + } + + public void incrExecutorNumQueuedRequests() { + executorNumQueuedRequests.incr(); + } + + public void decrExecutorNumQueuedRequests() { + executorNumQueuedRequests.incr(-1); + } + + public void incrExecutorTotalSuccess() { + executorTotalSuccess.incr(); + } + + public void incrExecutorTotalExecutionFailed() { + executorTotalExecutionFailed.incr(); + } + + public void incrExecutorTotalInterrupted() { + executorTotalInterrupted.incr(); + } + + public void incrExecutorTotalAskedToDie() { + executorTotalAskedToDie.incr(); + } + + private void getExecutorStats(MetricsRecordBuilder rb) { + // TODO: Enable this after adding InstrumentedThreadPool executor +// for (int i = 0; i < numExecutors; i++) { +// rb.addCounter(EXECUTOR_THREAD_CPU_TIME, executorThreadCpuTime[i].value()) +// .addCounter(EXECUTOR_THREAD_USER_TIME, executorThreadUserTime[i].value()) +// .addCounter(EXECUTOR_THREAD_SYSTEM_TIME, executorThreadSystemTime[i].value()); +// } + + rb.addCounter(EXECUTOR_TOTAL_REQUESTS_HANDLED, executorTotalRequestHandled.value()) + .addCounter(EXECUTOR_NUM_QUEUED_REQUESTS, executorNumQueuedRequests.value()) + .addCounter(EXECUTOR_TOTAL_SUCCESS, executorTotalSuccess.value()) + .addCounter(EXECUTOR_TOTAL_EXECUTION_FAILURE, executorTotalExecutionFailed.value()) + .addCounter(EXECUTOR_TOTAL_INTERRUPTED, executorTotalInterrupted.value()) + .addCounter(EXECUTOR_TOTAL_ASKED_TO_DIE, executorTotalAskedToDie.value()); + } + + public JvmMetrics getJvmMetrics() { + return jvmMetrics; + } + + public String getName() { + return name; + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapMetricsSystem.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapMetricsSystem.java new file mode 100644 index 0000000..710182d --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapMetricsSystem.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.metrics; + +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; + +/** + * Metrics system for llap daemon. We do not use DefaultMetricsSystem here to safegaurd against + * Tez accidentally shutting it down. + */ +public enum LlapMetricsSystem { + INSTANCE; + + private AtomicReference impl = + new AtomicReference(new MetricsSystemImpl()); + + /** + * Convenience method to initialize the metrics system + * @param prefix for the metrics system configuration + * @return the metrics system instance + */ + public static MetricsSystem initialize(String prefix) { + return INSTANCE.impl.get().init(prefix); + } + + /** + * @return the metrics system object + */ + public static MetricsSystem instance() { + return INSTANCE.impl.get(); + } + + /** + * Shutdown the metrics system + */ + public static void shutdown() { + INSTANCE.impl.get().shutdown(); + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/MetricsUtils.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/MetricsUtils.java new file mode 100644 index 0000000..124b8f2 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/MetricsUtils.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.metrics; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.UUID; + +/** + * Utility methods for metrics system. + */ +public class MetricsUtils { + private static final String LOCALHOST = "localhost"; + + public static String getHostName() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + return LOCALHOST; + } + } + + public static String getUUID() { + return String.valueOf(UUID.randomUUID()); + } +} diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java index 5a62999..bbe1a54 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hive.llap.cache; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -28,8 +31,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; import org.junit.Test; -import static org.junit.Assert.*; public class TestBuddyAllocator { private static final Log LOG = LogFactory.getLog(TestBuddyAllocator.class); @@ -57,7 +60,8 @@ public void testVariableSizeMultiAllocs() { public void testSameSizes() { int min = 3, max = 8, maxAlloc = 1 << max; Configuration conf = createConf(1 << min, maxAlloc, maxAlloc, maxAlloc); - BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager()); + BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager(), + LlapDaemonCacheMetrics.create("test", "1")); for (int i = max; i >= min; --i) { allocSameSize(a, 1 << (max - i), i); } @@ -67,7 +71,8 @@ public void testSameSizes() { public void testMultipleArenas() { int max = 8, maxAlloc = 1 << max, allocLog2 = max - 1, arenaCount = 5; Configuration conf = createConf(1 << 3, maxAlloc, maxAlloc, maxAlloc * arenaCount); - BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager()); + BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager(), + LlapDaemonCacheMetrics.create("test", "1")); allocSameSize(a, arenaCount * 2, allocLog2); } @@ -75,7 +80,8 @@ public void testMultipleArenas() { public void testMTT() { final int min = 3, max = 8, maxAlloc = 1 << max, allocsPerSize = 3; Configuration conf = createConf(1 << min, maxAlloc, maxAlloc * 8, maxAlloc * 24); - final BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager()); + final BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager(), + LlapDaemonCacheMetrics.create("test", "1")); ExecutorService executor = Executors.newFixedThreadPool(3); final CountDownLatch cdlIn = new CountDownLatch(3), cdlOut = new CountDownLatch(1); FutureTask upTask = new FutureTask(new Runnable() { @@ -124,7 +130,8 @@ private void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cd private void testVariableSizeInternal(int allocCount, int arenaSizeMult, int arenaCount) { int min = 3, max = 8, maxAlloc = 1 << max, arenaSize = maxAlloc * arenaSizeMult; Configuration conf = createConf(1 << min, maxAlloc, arenaSize, arenaSize * arenaCount); - BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager()); + BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager(), + LlapDaemonCacheMetrics.create("test", "1")); allocateUp(a, min, max, allocCount, true); allocateDown(a, min, max, allocCount, true); allocateDown(a, min, max, allocCount, false); diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java index 915a577..0650199 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java @@ -17,6 +17,11 @@ */ package org.apache.hadoop.hive.llap.cache; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + import java.util.Arrays; import java.util.Random; import java.util.concurrent.Callable; @@ -34,11 +39,10 @@ import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk; import org.junit.Test; -import static org.junit.Assert.*; - public class TestLowLevelCacheImpl { private static final Log LOG = LogFactory.getLog(TestLowLevelCacheImpl.class); @@ -47,7 +51,7 @@ public boolean allocateMultiple(LlapMemoryBuffer[] dest, int size) { for (int i = 0; i < dest.length; ++i) { LlapCacheableBuffer buf = new LlapCacheableBuffer(); - buf.initialize(0, null, -1, size); + buf.initialize(0, null, -1, size, null); dest[i] = buf; } return true; @@ -88,7 +92,8 @@ public void setEvictionListener(EvictionListener listener) { public void testGetPut() { Configuration conf = createConf(); LowLevelCacheImpl cache = new LowLevelCacheImpl( - conf, new DummyCachePolicy(), new DummyAllocator(), -1); // no cleanup thread + LlapDaemonCacheMetrics.create("test", "1"), new DummyCachePolicy(), + new DummyAllocator(), -1); // no cleanup thread long fn1 = 1, fn2 = 2; LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb(), fb(), fb(), fb(), fb() }; verifyRefcount(fakes, 1, 1, 1, 1, 1, 1); @@ -146,7 +151,8 @@ private void verifyCacheGet(LowLevelCacheImpl cache, long fileId, Object... stuf public void testMultiMatch() { Configuration conf = createConf(); LowLevelCacheImpl cache = new LowLevelCacheImpl( - conf, new DummyCachePolicy(), new DummyAllocator(), -1); // no cleanup thread + LlapDaemonCacheMetrics.create("test", "1"), new DummyCachePolicy(), + new DummyAllocator(), -1); // no cleanup thread long fn = 1; LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb() }; assertNull(cache.putFileData(fn, new DiskRange[] { dr(2, 4), dr(6, 8) }, fakes, 0)); @@ -164,7 +170,8 @@ public void testMultiMatch() { public void testStaleValueGet() { Configuration conf = createConf(); LowLevelCacheImpl cache = new LowLevelCacheImpl( - conf, new DummyCachePolicy(), new DummyAllocator(), -1); // no cleanup thread + LlapDaemonCacheMetrics.create("test", "1"), new DummyCachePolicy(), + new DummyAllocator(), -1); // no cleanup thread long fn1 = 1, fn2 = 2; LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb(), fb() }; assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1), 0)); @@ -183,7 +190,8 @@ public void testStaleValueGet() { public void testStaleValueReplace() { Configuration conf = createConf(); LowLevelCacheImpl cache = new LowLevelCacheImpl( - conf, new DummyCachePolicy(), new DummyAllocator(), -1); // no cleanup thread + LlapDaemonCacheMetrics.create("test", "1"), new DummyCachePolicy(), + new DummyAllocator(), -1); // no cleanup thread long fn1 = 1, fn2 = 2; LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb() }; @@ -202,7 +210,7 @@ public void testStaleValueReplace() { public void testMTTWithCleanup() { Configuration conf = createConf(); final LowLevelCacheImpl cache = new LowLevelCacheImpl( - conf, new DummyCachePolicy(), new DummyAllocator(), 1); + LlapDaemonCacheMetrics.create("test", "1"), new DummyCachePolicy(), new DummyAllocator(), 1); final long fn1 = 1, fn2 = 2; final int offsetsToUse = 8; final CountDownLatch cdlIn = new CountDownLatch(4), cdlOut = new CountDownLatch(1); diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java index 831bdec..efa3266 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java @@ -17,6 +17,13 @@ */ package org.apache.hadoop.hive.llap.cache; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -26,9 +33,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; import org.junit.Assume; import org.junit.Test; -import static org.junit.Assert.*; public class TestLowLevelLrfuCachePolicy { private static final Log LOG = LogFactory.getLog(TestLowLevelLrfuCachePolicy.class); @@ -72,7 +79,8 @@ public void testLfuExtreme() { conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.0f); EvictionTracker et = new EvictionTracker(); LowLevelLrfuCachePolicy lfu = new LowLevelLrfuCachePolicy(conf); - LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(conf, lfu); + LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(conf, lfu, + LlapDaemonCacheMetrics.create("test", "1")); lfu.setEvictionListener(et); for (int i = 0; i < heapSize; ++i) { LlapCacheableBuffer buffer = LowLevelCacheImpl.allocateFake(); @@ -108,7 +116,8 @@ public void testLruExtreme() { conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f); EvictionTracker et = new EvictionTracker(); LowLevelLrfuCachePolicy lru = new LowLevelLrfuCachePolicy(conf); - LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(conf, lru); + LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(conf, lru, + LlapDaemonCacheMetrics.create("test", "1")); lru.setEvictionListener(et); for (int i = 0; i < heapSize; ++i) { LlapCacheableBuffer buffer = LowLevelCacheImpl.allocateFake(); @@ -134,7 +143,8 @@ public void testDeadlockResolution() { EvictionTracker et = new EvictionTracker(); Configuration conf = createConf(1, heapSize); LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(conf); - LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(conf, lrfu); + LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(conf, lrfu, + LlapDaemonCacheMetrics.create("test", "1")); lrfu.setEvictionListener(et); for (int i = 0; i < heapSize; ++i) { LlapCacheableBuffer buffer = LowLevelCacheImpl.allocateFake(); @@ -189,7 +199,8 @@ private void testHeapSize(int heapSize) { conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.2f); // very small heap, 14 elements EvictionTracker et = new EvictionTracker(); LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(conf); - LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(conf, lrfu); + LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(conf, lrfu, + LlapDaemonCacheMetrics.create("test", "1")); lrfu.setEvictionListener(et); // Insert the number of elements plus 2, to trigger 2 evictions. int toEvict = 2;