diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java index d33f724..ca1df5c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java @@ -186,9 +186,7 @@ private static void addCollectionEstimator(HashMap, ObjectEstimator> by fieldCol = (Collection)fieldObj; if (fieldCol.size() == 0) { fieldCol = null; - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Empty collection " + field); - } + LlapIoImpl.LOG.info("Empty collection {}", field); } } if (fieldCol != null) { @@ -219,9 +217,7 @@ private static void addMapEstimator(HashMap, ObjectEstimator> byType, fieldCol = (Map)fieldObj; if (fieldCol.size() == 0) { fieldCol = null; - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Empty map " + field); - } + LlapIoImpl.LOG.info("Empty map {}", field); } } if (fieldCol != null) { @@ -257,15 +253,11 @@ private static void addMapEstimator(HashMap, ObjectEstimator> byType, return new Class[] { (Class)types[0], (Class)types[1] }; } else { // TODO: we could try to get the declaring object and infer argument... stupid Java. - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Cannot determine map type: " + field); - } + LlapIoImpl.LOG.info("Cannot determine map type: {}", field); } } else { // TODO: we could try to get superclass or generic interfaces. - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Non-parametrized map type: " + field); - } + LlapIoImpl.LOG.info("Non-parametrized map type: {}", field); } return null; } @@ -279,15 +271,11 @@ private static void addMapEstimator(HashMap, ObjectEstimator> byType, return (Class)type; } else { // TODO: we could try to get the declaring object and infer argument... stupid Java. - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Cannot determine collection type: " + field); - } + LlapIoImpl.LOG.info("Cannot determine collection type: {}", field); } } else { // TODO: we could try to get superclass or generic interfaces. - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Non-parametrized collection type: " + field); - } + LlapIoImpl.LOG.info("Non-parametrized collection type: {}", field); } return null; } @@ -297,11 +285,7 @@ private static void addArrayEstimator( Field field, Object fieldObj) { if (fieldObj == null) return; int arrayLen = Array.getLength(fieldObj); - if (arrayLen == 0) { - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Empty array " + field); - } - } + LlapIoImpl.LOG.info("Empty array {}", field); for (int i = 0; i < arrayLen; ++i) { Object element = Array.get(fieldObj, i); if (element != null) { @@ -416,10 +400,8 @@ protected int estimate(Object obj, HashMap, ObjectEstimator> parent, ObjectEstimator collEstimator = parent.get(fieldObj.getClass()); if (collEstimator == null) { // We have no estimator for this type... assume low overhead and hope for the best. - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Approximate estimation for collection " - + fieldObj.getClass().getName() + " from " + e.field); - } + LlapIoImpl.LOG.info("Approximate estimation for collection {} from {}", e.field, + fieldObj.getClass().getName()); referencedSize += memoryModel.object(); referencedSize += estimateCollectionElements(parent, c, e.field, uniqueObjects); referencedSize += memoryModel.array() + c.size() * memoryModel.ref(); @@ -429,10 +411,8 @@ protected int estimate(Object obj, HashMap, ObjectEstimator> parent, referencedSize += ((CollectionEstimator)collEstimator).estimateOverhead(c.size()); } else { // We decided to treat this collection as regular object. - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Verbose estimation for collection " - + fieldObj.getClass().getName() + " from " + e.field); - } + LlapIoImpl.LOG.info("Verbose estimation for collection {} from {}", + fieldObj.getClass().getName(), e.field); referencedSize += collEstimator.estimate(c, parent, uniqueObjects); } break; @@ -442,10 +422,8 @@ protected int estimate(Object obj, HashMap, ObjectEstimator> parent, ObjectEstimator collEstimator = parent.get(fieldObj.getClass()); if (collEstimator == null) { // We have no estimator for this type... assume low overhead and hope for the best. - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Approximate estimation for map " - + fieldObj.getClass().getName() + " from " + e.field); - } + LlapIoImpl.LOG.info("Approximate estimation for map {} from {}", + fieldObj.getClass().getName(), e.field); referencedSize += memoryModel.object(); referencedSize += estimateMapElements(parent, m, e.field, uniqueObjects); referencedSize += memoryModel.array() + m.size() @@ -456,10 +434,8 @@ protected int estimate(Object obj, HashMap, ObjectEstimator> parent, referencedSize += ((CollectionEstimator)collEstimator).estimateOverhead(m.size()); } else { // We decided to treat this map as regular object. - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Verbose estimation for map " - + fieldObj.getClass().getName() + " from " + e.field); - } + LlapIoImpl.LOG.info("Verbose estimation for map {} from {}", + fieldObj.getClass().getName(), e.field); referencedSize += collEstimator.estimate(m, parent, uniqueObjects); } break; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java index 840aeab..78f0b37 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java @@ -83,8 +83,8 @@ int incRef() { newRefCount = oldRefCount + 1; if (refCount.compareAndSet(oldRefCount, newRefCount)) break; } - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Locked " + this + "; new ref count " + newRefCount); + if (LlapIoImpl.LOCKING_LOGGER.isInfoEnabled()) { + LlapIoImpl.LOCKING_LOGGER.info("Locked {}; new ref count {}", this, newRefCount); } return newRefCount; } @@ -109,8 +109,8 @@ public boolean isInvalid() { int decRef() { int newRefCount = refCount.decrementAndGet(); - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Unlocked " + this + "; refcount " + newRefCount); + if (LlapIoImpl.LOCKING_LOGGER.isInfoEnabled()) { + LlapIoImpl.LOCKING_LOGGER.info("Unlocked {}; refcount {}", this, newRefCount); } if (newRefCount < 0) { throw new AssertionError("Unexpected refCount " + newRefCount + ": " + this); @@ -128,8 +128,8 @@ public boolean invalidate() { if (value != 0) return false; if (refCount.compareAndSet(value, EVICTED_REFCOUNT)) break; } - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Invalidated " + this + " due to eviction"); + if (LlapIoImpl.LOCKING_LOGGER.isInfoEnabled()) { + LlapIoImpl.LOCKING_LOGGER.info("Invalidated {} due to eviction", this); } return true; } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java index 1132171..1a082a1 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java @@ -58,9 +58,8 @@ public LowLevelCacheImpl(LlapDaemonCacheMetrics metrics, LowLevelCachePolicy cac @VisibleForTesting LowLevelCacheImpl(LlapDaemonCacheMetrics metrics, LowLevelCachePolicy cachePolicy, EvictionAwareAllocator allocator, boolean doAssumeGranularBlocks, long cleanupInterval) { - if (LlapIoImpl.LOGL.isInfoEnabled()) { - LlapIoImpl.LOG.info("Low level cache; cleanup interval " + cleanupInterval + "sec"); - } + + LlapIoImpl.LOG.info("Low level cache; cleanup interval {} sec", cleanupInterval); this.cachePolicy = cachePolicy; this.allocator = allocator; this.cleanupInterval = cleanupInterval; @@ -148,9 +147,7 @@ private void getOverlappingRanges(long baseOffset, DiskRangeList currentNotCache LlapDataBuffer buffer = e.getValue(); long requestedLength = currentNotCached.getLength(); // Lock the buffer, validate it and add to results. - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Locking " + buffer + " during get"); - } + LlapIoImpl.LOCKING_LOGGER.info("Locking {} during get", buffer); if (!lockBuffer(buffer, true)) { // If we cannot lock, remove this from cache and continue. @@ -183,7 +180,6 @@ private void getOverlappingRanges(long baseOffset, DiskRangeList currentNotCache * Adds cached buffer to buffer list. * @param currentNotCached Pointer to the list node where we are inserting. * @param currentCached The cached buffer found for this node, to insert. - * @param resultObj * @return The new currentNotCached pointer, following the cached buffer insertion. */ private DiskRangeList addCachedBufferToIter( @@ -240,9 +236,7 @@ private boolean lockBuffer(LlapDataBuffer buffer, boolean doNotifyPolicy) { try { for (int i = 0; i < ranges.length; ++i) { LlapDataBuffer buffer = (LlapDataBuffer)buffers[i]; - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Locking " + buffer + " at put time"); - } + LlapIoImpl.LOCKING_LOGGER.info("Locking {} at put time", buffer); boolean canLock = lockBuffer(buffer, false); assert canLock; long offset = ranges[i].getOffset() + baseOffset; @@ -258,14 +252,11 @@ private boolean lockBuffer(LlapDataBuffer buffer, boolean doNotifyPolicy) { } break; } - if (DebugUtils.isTraceCachingEnabled()) { - LlapIoImpl.LOG.info("Trying to cache when the chunk is already cached for " - + fileId + "@" + offset + " (base " + baseOffset + "); old " + oldVal - + ", new " + buffer); - } - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Locking " + oldVal + " due to cache collision"); + if (LlapIoImpl.CACHE_LOGGER.isInfoEnabled()) { + LlapIoImpl.CACHE_LOGGER.info("Trying to cache when the chunk is already cached for" + + " {}@{} (base {}); old {}, new {}", fileId, offset, baseOffset, oldVal, buffer); } + LlapIoImpl.LOCKING_LOGGER.info("Locking {} due to cache collision", oldVal); if (lockBuffer(oldVal, true)) { // We don't do proper overlap checking because it would cost cycles and we // think it will never happen. We do perform the most basic check here. @@ -275,8 +266,9 @@ private boolean lockBuffer(LlapDataBuffer buffer, boolean doNotifyPolicy) { + " (base " + baseOffset + ")"); } // We found an old, valid block for this key in the cache. - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Unlocking " + buffer + " due to cache collision with " + oldVal); + if (LlapIoImpl.LOCKING_LOGGER.isInfoEnabled()) { + LlapIoImpl.LOCKING_LOGGER.info("Unlocking {} due to cache collision with {}", + buffer, oldVal); } unlockBuffer(buffer, false); @@ -353,8 +345,8 @@ private void unlockBuffer(LlapDataBuffer buffer, boolean handleLastDecRef) { if (buffer.declaredCachedLength != LlapDataBuffer.UNKNOWN_CACHED_LENGTH) { cachePolicy.notifyUnlock(buffer); } else { - if (DebugUtils.isTraceCachingEnabled()) { - LlapIoImpl.LOG.info("Deallocating " + buffer + " that was not cached"); + if (LlapIoImpl.LOCKING_LOGGER.isInfoEnabled()) { + LlapIoImpl.LOCKING_LOGGER.info("Deallocating {} that was not cached", buffer); } allocator.deallocate(buffer); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java index 1cfe2bc..4def4a1 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java @@ -50,9 +50,9 @@ public LowLevelCacheMemoryManager( this.usedMemory = new AtomicLong(0); this.metrics = metrics; metrics.setCacheCapacityTotal(maxSize); - if (LlapIoImpl.LOGL.isInfoEnabled()) { - LlapIoImpl.LOG.info("Memory manager initialized with max size " + maxSize + " and " - + ((evictor == null) ? "no " : "") + "ability to evict blocks"); + if (LlapIoImpl.LOG.isInfoEnabled()) { + LlapIoImpl.LOG.info("Memory manager initialized with max size {} and" + + " {} ability to evict blocks", maxSize, ((evictor == null) ? "no " : "")); } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java index 1430eae..846f550 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java @@ -34,10 +34,8 @@ private EvictionListener evictionListener; private LlapOomDebugDump parentDebugDump; - public LowLevelFifoCachePolicy(Configuration conf) { - if (LlapIoImpl.LOGL.isInfoEnabled()) { - LlapIoImpl.LOG.info("FIFO cache policy"); - } + public LowLevelFifoCachePolicy() { + LlapIoImpl.LOG.info("FIFO cache policy"); buffers = new LinkedList(); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java index 6f52b86..40d57a0 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java @@ -22,10 +22,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.DebugUtils; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; @@ -66,14 +64,9 @@ private final double expirePriority(long time, long lastAccess, double previous) private EvictionListener evictionListener; private LlapOomDebugDump parentDebugDump; - public LowLevelLrfuCachePolicy(Configuration conf) { - this((int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC), - HiveConf.getSizeVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE), conf); - } - @VisibleForTesting - public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, Configuration conf) { - lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA); + public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, double lambda) { + this.lambda = lambda; int maxBuffers = (int)Math.ceil((maxSize * 1.0) / minBufferSize); int maxHeapSize = -1; if (lambda == 0) { @@ -82,9 +75,9 @@ public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, Configuration co int lrfuThreshold = (int)((Math.log(1 - Math.pow(0.5, lambda)) / Math.log(0.5)) / lambda); maxHeapSize = Math.min(lrfuThreshold, maxBuffers); } - if (LlapIoImpl.LOGL.isInfoEnabled()) { - LlapIoImpl.LOG.info("LRFU cache policy with min buffer size " + minBufferSize - + " and lambda " + lambda + " (heap size " + maxHeapSize + ")"); + if (LlapIoImpl.LOG.isInfoEnabled()) { + LlapIoImpl.LOG.info("LRFU cache policy with min buffer size {} and lambda {} (heap size {})", + minBufferSize, lambda, maxHeapSize); } heap = new LlapCacheableBuffer[maxHeapSize]; @@ -123,8 +116,8 @@ public void notifyLock(LlapCacheableBuffer buffer) { @Override public void notifyUnlock(LlapCacheableBuffer buffer) { long time = timer.incrementAndGet(); - if (DebugUtils.isTraceCachingEnabled()) { - LlapIoImpl.LOG.info("Touching " + buffer + " at " + time); + if (LlapIoImpl.LOG.isInfoEnabled()) { + LlapIoImpl.LOG.info("Touching {} at {}", buffer, time); } synchronized (heap) { // First, update buffer priority - we have just been using it. @@ -263,8 +256,8 @@ private LlapCacheableBuffer evictFromHeapUnderLock(long time) { while (true) { if (heapSize == 0) return null; LlapCacheableBuffer result = heap[0]; - if (DebugUtils.isTraceCachingEnabled()) { - LlapIoImpl.LOG.info("Evicting " + result + " at " + time); + if (LlapIoImpl.LOG.isInfoEnabled()) { + LlapIoImpl.LOG.info("Evicting {} at {}", result, time); } result.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE; --heapSize; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java index 734a5c0..70aded6 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java @@ -30,9 +30,7 @@ private final LlapDaemonCacheMetrics metrics; public SimpleBufferManager(Allocator allocator, LlapDaemonCacheMetrics metrics) { - if (LlapIoImpl.LOGL.isInfoEnabled()) { - LlapIoImpl.LOG.info("Simple buffer manager"); - } + LlapIoImpl.LOG.info("Simple buffer manager"); this.allocator = allocator; this.metrics = metrics; } @@ -46,8 +44,8 @@ private boolean lockBuffer(LlapDataBuffer buffer) { private void unlockBuffer(LlapDataBuffer buffer) { if (buffer.decRef() == 0) { - if (DebugUtils.isTraceCachingEnabled()) { - LlapIoImpl.LOG.info("Deallocating " + buffer + " that was not cached"); + if (LlapIoImpl.LOG.isInfoEnabled()) { + LlapIoImpl.LOG.info("Deallocating {} that was not cached", buffer); } allocator.deallocate(buffer); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index 85cca97..e3dbdad 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -264,7 +264,7 @@ ColumnVectorBatch nextCvb() throws InterruptedException, IOException { } synchronized (pendingData) { // We are waiting for next block. Either we will get it, or be told we are done. - boolean doLogBlocking = DebugUtils.isTraceMttEnabled() && isNothingToReport(); + boolean doLogBlocking = LlapIoImpl.LOG.isInfoEnabled() && isNothingToReport(); if (doLogBlocking) { LlapIoImpl.LOG.info("next will block"); } @@ -277,8 +277,8 @@ ColumnVectorBatch nextCvb() throws InterruptedException, IOException { rethrowErrorIfAny(); lastCvb = pendingData.poll(); } - if (DebugUtils.isTraceMttEnabled() && lastCvb != null) { - LlapIoImpl.LOG.info("Processing will receive vector " + lastCvb); + if (LlapIoImpl.LOG.isInfoEnabled() && lastCvb != null) { + LlapIoImpl.LOG.info("Processing will receive vector {}", lastCvb); } return lastCvb; } @@ -304,9 +304,9 @@ public long getPos() throws IOException { @Override public void close() throws IOException { - if (DebugUtils.isTraceMttEnabled()) { - LlapIoImpl.LOG.info("close called; closed " + isClosed + ", done " + isDone - + ", err " + pendingError + ", pending " + pendingData.size()); + if (LlapIoImpl.LOG.isInfoEnabled()) { + LlapIoImpl.LOG.info("close called; closed {}, done {}, err {}, pending {}", + isClosed, isDone, pendingError, pendingData.size()); } LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged! feedback.stop(); @@ -323,9 +323,9 @@ private void rethrowErrorIfAny() throws IOException { @Override public void setDone() { - if (DebugUtils.isTraceMttEnabled()) { - LlapIoImpl.LOG.info("setDone called; closed " + isClosed - + ", done " + isDone + ", err " + pendingError + ", pending " + pendingData.size()); + if (LlapIoImpl.LOG.isInfoEnabled()) { + LlapIoImpl.LOG.info("setDone called; closed {}, done {}, err {}, pending {}", + isClosed, isDone, pendingError, pendingData.size()); } synchronized (pendingData) { isDone = true; @@ -335,9 +335,9 @@ public void setDone() { @Override public void consumeData(ColumnVectorBatch data) { - if (DebugUtils.isTraceMttEnabled()) { - LlapIoImpl.LOG.info("consume called; closed " + isClosed + ", done " + isDone - + ", err " + pendingError + ", pending " + pendingData.size()); + if (LlapIoImpl.LOG.isInfoEnabled()) { + LlapIoImpl.LOG.info("consume called; closed {}, done {}, err {}, pending {}", + isClosed, isDone, pendingError, pendingData.size()); } synchronized (pendingData) { if (isClosed) { @@ -351,8 +351,8 @@ public void consumeData(ColumnVectorBatch data) { @Override public void setError(Throwable t) { counters.incrCounter(LlapIOCounters.NUM_ERRORS); - LlapIoImpl.LOG.info("setError called; closed " + isClosed - + ", done " + isDone + ", err " + pendingError + ", pending " + pendingData.size()); + LlapIoImpl.LOG.info("setError called; closed {}, done {}, err {}, pending {}", + isClosed, isDone, pendingError, pendingData.size()); assert t != null; synchronized (pendingData) { pendingError = t; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index d2c1907..1694e25 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.llap.io.api.impl; -import org.apache.hadoop.hive.llap.LogLevels; - import java.io.IOException; import java.util.concurrent.Executors; @@ -60,8 +58,11 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; public class LlapIoImpl implements LlapIo { - public static final Logger LOG = LoggerFactory.getLogger(LlapIoImpl.class); - public static final LogLevels LOGL = new LogLevels(LOG); + public static final Logger LOG = LoggerFactory.getLogger("LlapIoImpl"); + public static final Logger ORC_LOGGER = LoggerFactory.getLogger("LlapIoOrc"); + public static final Logger CACHE_LOGGER = LoggerFactory.getLogger("LlapIoCache"); + public static final Logger LOCKING_LOGGER = LoggerFactory.getLogger("LlapIoLocking"); + private static final String MODE_CACHE = "cache", MODE_ALLOCATOR = "allocator"; private final ColumnVectorProducer cvp; @@ -75,9 +76,7 @@ private LlapIoImpl(Configuration conf) throws IOException { String ioMode = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MODE); boolean useLowLevelCache = LlapIoImpl.MODE_CACHE.equalsIgnoreCase(ioMode), useAllocOnly = !useLowLevelCache && LlapIoImpl.MODE_ALLOCATOR.equalsIgnoreCase(ioMode); - if (LOGL.isInfoEnabled()) { - LOG.info("Initializing LLAP IO in " + ioMode + " mode"); - } + LOG.info("Initializing LLAP IO in {} mode", ioMode); String displayName = "LlapDaemonCacheMetrics-" + MetricsUtils.getHostName(); String sessionId = conf.get("llap.daemon.metrics.sessionid"); @@ -88,8 +87,8 @@ private LlapIoImpl(Configuration conf) throws IOException { HiveConf.ConfVars.LLAP_QUEUE_METRICS_PERCENTILE_INTERVALS)); this.queueMetrics = LlapDaemonQueueMetrics.create(displayName, sessionId, intervals); - LOG.info("Started llap daemon metrics with displayName: " + displayName + - " sessionId: " + sessionId); + LOG.info("Started llap daemon metrics with displayName: {} sessionId: {}", displayName, + sessionId); Cache cache = null; // High-level cache is not implemented or supported. @@ -99,8 +98,15 @@ private LlapIoImpl(Configuration conf) throws IOException { if (useLowLevelCache) { // Memory manager uses cache policy to trigger evictions, so create the policy first. boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU); - LowLevelCachePolicy cachePolicy = - useLrfu ? new LowLevelLrfuCachePolicy(conf) : new LowLevelFifoCachePolicy(conf); + LowLevelCachePolicy cachePolicy; + if (useLrfu) { + int minAlloc = (int) HiveConf.getSizeVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MIN_ALLOC); + long memMaxSize = HiveConf.getSizeVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE); + double lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA); + cachePolicy = new LowLevelLrfuCachePolicy(minAlloc, memMaxSize, lambda); + } else { + cachePolicy = new LowLevelFifoCachePolicy(); + } // Allocator uses memory manager to request memory, so create the manager next. LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager( conf, cachePolicy, cacheMetrics); @@ -132,9 +138,7 @@ private LlapIoImpl(Configuration conf) throws IOException { // TODO: this should depends on input format and be in a map, or something. this.cvp = new OrcColumnVectorProducer( metadataCache, orcCache, bufferManager, cache, conf, cacheMetrics, queueMetrics); - if (LOGL.isInfoEnabled()) { - LOG.info("LLAP IO initialized"); - } + LOG.info("LLAP IO initialized"); registerMXBeans(); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java index 18191da..f06c681 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java @@ -52,9 +52,7 @@ public OrcColumnVectorProducer(OrcMetadataCache metadataCache, LowLevelCacheImpl lowLevelCache, BufferUsageManager bufferManager, Cache cache, Configuration conf, LlapDaemonCacheMetrics metrics, LlapDaemonQueueMetrics queueMetrics) { - if (LlapIoImpl.LOGL.isInfoEnabled()) { - LlapIoImpl.LOG.info("Initializing ORC column vector producer"); - } + LlapIoImpl.LOG.info("Initializing ORC column vector producer"); this.metadataCache = metadataCache; this.lowLevelCache = lowLevelCache; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index bcee56b..dd87357 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -188,9 +188,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff @Override public void stop() { - if (LOG.isDebugEnabled()) { - LOG.debug("Encoded reader is being stopped"); - } + LOG.info("Encoded reader is being stopped"); isStopped = true; } @@ -218,9 +216,7 @@ public Void run() throws Exception { protected Void performDataRead() throws IOException { long startTime = counters.startTimeCounter(); - if (LlapIoImpl.LOGL.isInfoEnabled()) { - LlapIoImpl.LOG.info("Processing data for " + split.getPath()); - } + LlapIoImpl.LOG.info("Processing data for {}", split.getPath()); if (processStop()) { recordReaderTime(startTime); return null; @@ -328,7 +324,7 @@ protected Void performDataRead() throws IOException { // Reader creating updates HDFS counters, don't do it here. DataWrapperForOrc dw = new DataWrapperForOrc(); stripeReader = orcReader.encodedReader(fileId, dw, dw, POOL_FACTORY); - stripeReader.setDebugTracing(DebugUtils.isTraceOrcEnabled()); + stripeReader.setDebugTracing(LlapIoImpl.ORC_LOGGER.isDebugEnabled()); } catch (Throwable t) { consumer.setError(t); recordReaderTime(startTime); @@ -357,10 +353,8 @@ protected Void performDataRead() throws IOException { if (cols != null && cols.isEmpty()) continue; // No need to read this stripe. stripe = fileMetadata.getStripes().get(stripeIx); - if (DebugUtils.isTraceOrcEnabled()) { - LlapIoImpl.LOG.info("Reading stripe " + stripeIx + ": " - + stripe.getOffset() + ", " + stripe.getLength()); - } + LlapIoImpl.ORC_LOGGER.info("Reading stripe {}: {}, {}", stripeIx, stripe.getOffset(), + stripe.getLength()); colRgs = readState[stripeIxMod]; // We assume that NO_RGS value is only set from SARG filter and for all columns; // intermediate changes for individual columns will unset values in the array. @@ -398,18 +392,18 @@ protected Void performDataRead() throws IOException { counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTimeHdfs); if (hasFileId && metadataCache != null) { stripeMetadata = metadataCache.putStripeMetadata(stripeMetadata); - if (DebugUtils.isTraceOrcEnabled()) { - LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx - + " metadata with includes: " + DebugUtils.toString(stripeIncludes)); + if (LlapIoImpl.ORC_LOGGER.isInfoEnabled()) { + LlapIoImpl.ORC_LOGGER.info("Caching stripe {} metadata with includes: {}", + stripeKey.stripeIx, DebugUtils.toString(stripeIncludes)); } } } consumer.setStripeMetadata(stripeMetadata); } if (!stripeMetadata.hasAllIndexes(stripeIncludes)) { - if (DebugUtils.isTraceOrcEnabled()) { - LlapIoImpl.LOG.info("Updating indexes in stripe " + stripeKey.stripeIx - + " metadata for includes: " + DebugUtils.toString(stripeIncludes)); + if (LlapIoImpl.ORC_LOGGER.isInfoEnabled()) { + LlapIoImpl.ORC_LOGGER.info("Updating indexes in stripe {} metadata for includes: {}", + stripeKey.stripeIx, DebugUtils.toString(stripeIncludes)); } assert isFoundInCache; counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS); @@ -451,9 +445,8 @@ protected Void performDataRead() throws IOException { // Done with all the things. recordReaderTime(startTime); dataConsumer.setDone(); - if (DebugUtils.isTraceMttEnabled()) { - LlapIoImpl.LOG.info("done processing " + split); - } + + LlapIoImpl.LOG.info("done processing {}", split); // Close the stripe reader, we are done reading. cleanupReaders(); @@ -603,9 +596,7 @@ private void ensureOrcReader() throws IOException { if (fileId != null && HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_USE_FILEID_PATH)) { path = HdfsUtils.getFileIdPath(fs, path, fileId); } - if (DebugUtils.isTraceOrcEnabled()) { - LOG.info("Creating reader for " + path + " (" + split.getPath() + ")"); - } + LlapIoImpl.ORC_LOGGER.info("Creating reader for {} ({})", path, split.getPath()); long startTime = counters.startTimeCounter(); ReaderOptions opts = OrcFile.readerOptions(conf).filesystem(fs).fileMetadata(fileMetadata); orcReader = EncodedOrcFile.createReader(path, opts); @@ -660,17 +651,17 @@ private OrcFileMetadata getOrReadFileMetadata() throws IOException { counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); if (hasFileId && metadataCache != null) { value = metadataCache.putStripeMetadata(value); - if (DebugUtils.isTraceOrcEnabled()) { - LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx - + " metadata with includes: " + DebugUtils.toString(globalInc)); + if (LlapIoImpl.ORC_LOGGER.isInfoEnabled()) { + LlapIoImpl.ORC_LOGGER.info("Caching stripe {} metadata with includes: {}", + stripeKey.stripeIx, DebugUtils.toString(globalInc)); } } } // We might have got an old value from cache; recheck it has indexes. if (!value.hasAllIndexes(globalInc)) { - if (DebugUtils.isTraceOrcEnabled()) { - LlapIoImpl.LOG.info("Updating indexes in stripe " + stripeKey.stripeIx - + " metadata for includes: " + DebugUtils.toString(globalInc)); + if (LlapIoImpl.ORC_LOGGER.isInfoEnabled()) { + LlapIoImpl.ORC_LOGGER.info("Updating indexes in stripe {} metadata for includes: {}", + stripeKey.stripeIx, DebugUtils.toString(globalInc)); } updateLoadedIndexes(value, si, globalInc, sargColumns); } @@ -697,9 +688,9 @@ public void returnData(OrcEncodedColumnBatch ecb) { if (datas == null) continue; for (ColumnStreamData data : datas) { if (data == null || data.decRef() != 0) continue; - if (DebugUtils.isTraceLockingEnabled()) { + if (LlapIoImpl.LOCKING_LOGGER.isInfoEnabled()) { for (MemoryBuffer buf : data.getCacheBuffers()) { - LlapIoImpl.LOG.info("Unlocking " + buf + " at the end of processing"); + LlapIoImpl.LOCKING_LOGGER.info("Unlocking {} at the end of processing", buf); } } bufferManager.decRefBuffers(data.getCacheBuffers()); @@ -738,14 +729,14 @@ private boolean determineRgsToRead(boolean[] globalIncludes, int rowIndexStride, boolean isNone = rgsToRead == SargApplier.READ_NO_RGS, isAll = rgsToRead == SargApplier.READ_ALL_RGS; hasAnyData = hasAnyData || !isNone; - if (DebugUtils.isTraceOrcEnabled()) { + if (LlapIoImpl.ORC_LOGGER.isInfoEnabled()) { if (isNone) { - LlapIoImpl.LOG.info("SARG eliminated all RGs for stripe " + stripeIx); + LlapIoImpl.ORC_LOGGER.info("SARG eliminated all RGs for stripe {}", stripeIx); } else if (!isAll) { - LlapIoImpl.LOG.info("SARG picked RGs for stripe " + stripeIx + ": " - + DebugUtils.toString(rgsToRead)); + LlapIoImpl.ORC_LOGGER.info("SARG picked RGs for stripe {}: {}", + stripeIx, DebugUtils.toString(rgsToRead)); } else { - LlapIoImpl.LOG.info("Will read all " + rgCount + " RGs for stripe " + stripeIx); + LlapIoImpl.ORC_LOGGER.info("Will read all {} RGs for stripe {}", rgCount, stripeIx); } } assert isAll || isNone || rgsToRead.length == rgCount; @@ -788,12 +779,12 @@ public void determineStripesToRead() { long offset = split.getStart(), maxOffset = offset + split.getLength(); stripeIxFrom = -1; int stripeIxTo = -1; - if (LlapIoImpl.LOGL.isDebugEnabled()) { + if (LlapIoImpl.ORC_LOGGER.isDebugEnabled()) { String tmp = "FileSplit {" + split.getStart() + ", " + split.getLength() + "}; stripes "; for (StripeInformation stripe : stripes) { tmp += "{" + stripe.getOffset() + ", " + stripe.getLength() + "}, "; } - LlapIoImpl.LOG.debug(tmp); + LlapIoImpl.ORC_LOGGER.debug(tmp); } int stripeIx = 0; @@ -805,33 +796,25 @@ public void determineStripesToRead() { continue; } if (stripeIxFrom == -1) { - if (DebugUtils.isTraceOrcEnabled()) { - LlapIoImpl.LOG.info("Including stripes from " + stripeIx - + " (" + stripeStart + " >= " + offset + ")"); - } + LlapIoImpl.ORC_LOGGER.info("Including stripes from {} ({} >= {})", + stripeIx, stripeStart, offset); stripeIxFrom = stripeIx; } if (stripeStart >= maxOffset) { stripeIxTo = stripeIx; - if (DebugUtils.isTraceOrcEnabled()) { - LlapIoImpl.LOG.info("Including stripes until " + stripeIxTo + " (" + stripeStart - + " >= " + maxOffset + "); " + (stripeIxTo - stripeIxFrom) + " stripes"); - } + LlapIoImpl.ORC_LOGGER.info("Including stripes until {} ({} >= {}); {} stripes", + stripeIxTo, stripeStart, maxOffset, (stripeIxTo - stripeIxFrom)); break; } ++stripeIx; } if (stripeIxFrom == -1) { - if (LlapIoImpl.LOG.isInfoEnabled()) { - LlapIoImpl.LOG.info("Not including any stripes - empty split"); - } + LlapIoImpl.LOG.info("Not including any stripes - empty split"); } if (stripeIxTo == -1 && stripeIxFrom != -1) { stripeIxTo = stripeIx; - if (DebugUtils.isTraceOrcEnabled()) { - LlapIoImpl.LOG.info("Including stripes until " + stripeIx + " (end of file); " - + (stripeIxTo - stripeIxFrom) + " stripes"); - } + LlapIoImpl.ORC_LOGGER.info("Including stripes until {} (end of file); {} stripes", + stripeIx, (stripeIxTo - stripeIxFrom)); } readState = new boolean[stripeIxTo - stripeIxFrom][][]; } @@ -988,9 +971,9 @@ public DiskRangeList readFileData(DiskRangeList range, long baseOffset, long startTime = counters.startTimeCounter(); DiskRangeList result = orcDataReader.readFileData(range, baseOffset, doForceDirect); counters.recordHdfsTime(startTime); - if (DebugUtils.isTraceOrcEnabled() && LOG.isInfoEnabled()) { - LOG.info("Disk ranges after disk read (file " + fileId + ", base offset " + baseOffset - + "): " + RecordReaderUtils.stringifyDiskRanges(result)); + if (LlapIoImpl.ORC_LOGGER.isInfoEnabled()) { + LlapIoImpl.ORC_LOGGER.info("Disk ranges after disk read (file {}, base offset {}): {}", + fileId, baseOffset, RecordReaderUtils.stringifyDiskRanges(result)); } return result; } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferInProgress.java b/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferInProgress.java deleted file mode 100644 index 9782b81..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferInProgress.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.old; - -import org.apache.hadoop.hive.llap.old.BufferPool.WeakBuffer; -import org.apache.hadoop.hive.llap.old.ChunkPool.Chunk; - -/** - * Helper struct that is used by loaders (e.g. OrcLoader) and chunk writer to write chunks. - */ -public class BufferInProgress { - /** Buffer that is being written to. */ - public final WeakBuffer buffer; - /** Offset in buffer where writing can proceed */ - public int offset; // TODO: use WB's position; these have separate lifecycle now, needed? - private final int bufferLimit; - - /** The chunk that is currently being written. */ - private Chunk chunkInProgress = null; - /** The row count of the chunk currently being written. */ - private int chunkInProgressRows = 0; - - public BufferInProgress(WeakBuffer buffer) { - this.buffer = buffer; - this.bufferLimit = buffer.getContents().limit(); - this.offset = 0; - } - - public Chunk ensureChunk() { - if (chunkInProgress == null) { - chunkInProgress = new Chunk(buffer, offset, 0); - chunkInProgressRows = 0; - } - return chunkInProgress; - } - - public Chunk extractChunk() { - Chunk result = chunkInProgress; - chunkInProgress = null; - chunkInProgressRows = 0; - return result; - } - - public void update(int newOffset, int rowsWritten) { - if (newOffset > bufferLimit) { - throw new AssertionError("Offset is beyond buffer limit: " + newOffset + "/" + bufferLimit - + "; previous offset " + offset + ", chunk " + chunkInProgress); - } - chunkInProgress.length += (newOffset - offset); - this.offset = newOffset; - this.chunkInProgressRows += rowsWritten; - } - - public int getChunkInProgressRows() { - return chunkInProgressRows; - } - - public int getSpaceLeft() { - return getSpaceLeft(-1); - } - - public int getSpaceLeft(int offset) { - offset = (offset >= 0) ? offset : this.offset; - return buffer.getContents().limit() - offset; - } -} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferPool.java b/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferPool.java deleted file mode 100644 index fc10b2b..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferPool.java +++ /dev/null @@ -1,225 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.hadoop.hive.llap.old; - -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.llap.DebugUtils; -import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; - -import com.google.common.annotations.VisibleForTesting; - -public class BufferPool { - // TODO: we should keep evicted buffers for reuse. Perhaps that too should be factored out. - private final CachePolicy cachePolicy; - private final Object evictionNotifyObj = new Object(); - private int evictionIsWaiting; // best effort flag - private final long maxCacheSize; - private final int bufferSize; - - - public BufferPool(Configuration conf) { - this.maxCacheSize = 0;// HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_CACHE_SIZE); - this.bufferSize = 0; // HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_BUFFER_SIZE); - this.cachePolicy = null; - } - - /** - * Allocates a new buffer. Buffer starts out locked (assumption is that caller is going to - * write to it immediately and then unlock it; future writers/readers will lock and unlock). - * @return Buffer. - */ - public WeakBuffer allocateBuffer() throws InterruptedException { - // TODO: for now, dumb byte arrays. Should be off-heap. - ByteBuffer newBuffer = ByteBuffer.allocate(bufferSize); - WeakBuffer wb = new WeakBuffer(this, newBuffer); - // Don't touch the buffer - it's not in cache yet. cache() will set the initial priority. - if (!wb.lock(false)) { - throw new AssertionError("Cannot lock a new buffer"); - } - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Locked " + wb + " after creation"); - } - boolean hasWaited = false; - WeakBuffer evicted = null; - while (true) { - evicted = cachePolicy.cache(wb); - if (evicted != CachePolicy.CANNOT_EVICT) break; - if (DebugUtils.isTraceCachingEnabled() && !hasWaited) { - LlapIoImpl.LOG.info("Failed to add a new block to cache; waiting for blocks to be unlocked"); - hasWaited = true; - } - synchronized (evictionNotifyObj) { - ++evictionIsWaiting; - evictionNotifyObj.wait(1000); - --evictionIsWaiting; - } - } - if (DebugUtils.isTraceCachingEnabled() && hasWaited) { - LlapIoImpl.LOG.info("Eviction is done waiting"); - } - if (evicted != null) { - //if (evictionListener != null) { - // evictionListener.evictionNotice(evicted); - //} - // After eviction notice, the contents can be reset. - evicted.clear(); - } - return wb; - } - - private final void unblockEviction() { - if (evictionIsWaiting <= 0) return; - synchronized (evictionNotifyObj) { - if (evictionIsWaiting <= 0) return; - if (DebugUtils.isTraceCachingEnabled()) { - LlapIoImpl.LOG.info("Notifying eviction that some block has been unlocked"); - } - evictionNotifyObj.notifyAll(); - } - } - - @VisibleForTesting - public static WeakBuffer allocateFake() { - return new WeakBuffer(null, ByteBuffer.wrap(new byte[1])); - } - - /** - * This class serves 3 purposes: - * 1) it implements BufferPool-specific hashCode and equals (ByteBuffer ones are content-based); - * 2) it contains the refCount; - * 3) by extension from (2), it can be held while it is evicted; when locking before the usage, - * the fact that the data has been evicted will be discovered (similar to weak_ptr). - * Note: not static because when we wait for something to become evict-able, - * we need to receive notifications from unlock (see unlock). Otherwise could be static. - */ - public static final class WeakBuffer { - private static final int EVICTED_REFCOUNT = -1; - private final BufferPool parent; - private ByteBuffer contents; - private final AtomicInteger refCount = new AtomicInteger(0); - - // TODO: Fields pertaining to cache policy. Perhaps they should live in separate object. - public double priority; - public long lastUpdate = -1; - public int indexInHeap = -1; - public boolean isLockedInHeap = false; - - private WeakBuffer(BufferPool parent, ByteBuffer contents) { - this.parent = parent; - this.contents = contents; - } - - public ByteBuffer getContents() { - assert isLocked() : "Cannot get contents with refCount " + refCount.get(); - return contents; - } - - @Override - public int hashCode() { - if (contents == null) return 0; - return System.identityHashCode(contents); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) return true; - if (!(obj instanceof WeakBuffer)) return false; - // We only compare objects, and not contents of the ByteBuffer. - // One ByteBuffer is never put in multiple WeakBuffer-s (that is the invariant). - return contents == ((WeakBuffer)obj).contents; - } - - public boolean lock(boolean doTouch) { - int oldRefCount = -1; - while (true) { - oldRefCount = refCount.get(); - if (oldRefCount == EVICTED_REFCOUNT) return false; - assert oldRefCount >= 0; - if (refCount.compareAndSet(oldRefCount, oldRefCount + 1)) break; - } - if (doTouch && oldRefCount == 0 && parent != null) { - parent.cachePolicy.notifyLock(this); - } - return true; - } - - public boolean isLocked() { - // Best-effort check. We cannot do a good check against caller thread, since - // refCount could still be > 0 if someone else locked. This is used for asserts. - return refCount.get() > 0; - } - - public boolean isInvalid() { - return refCount.get() == EVICTED_REFCOUNT; - } - - public boolean isCleared() { - return contents == null; - } - - public void unlock() { - int newRefCount = refCount.decrementAndGet(); - if (newRefCount < 0) { - throw new AssertionError("Unexpected refCount " + newRefCount); - } - // If this block became eligible, see if we need to unblock the eviction. - if (newRefCount == 0 && parent != null) { - parent.cachePolicy.notifyUnlock(this); - parent.unblockEviction(); - } - } - - @Override - public String toString() { - return "0x" + Integer.toHexString(hashCode()); - } - - /** - * @return Whether the we can invalidate; false if locked or already evicted. - */ - boolean invalidate() { - while (true) { - int value = refCount.get(); - if (value != 0) return false; - if (refCount.compareAndSet(value, EVICTED_REFCOUNT)) break; - } - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Invalidated " + this + " due to eviction"); - } - return true; - } - - ByteBuffer clear() { - assert refCount.get() == EVICTED_REFCOUNT; - ByteBuffer result = contents; - contents = null; - return result; - } - - public String toStringForCache() { - return "[" + Integer.toHexString(hashCode()) + " " + String.format("%1$.2f", priority) + " " - + lastUpdate + " " + (isLocked() ? "!" : ".") + "]"; - } - } -} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/old/CachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/old/CachePolicy.java deleted file mode 100644 index cca42fe..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/old/CachePolicy.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.old; - -import org.apache.hadoop.hive.llap.old.BufferPool.WeakBuffer; - -public interface CachePolicy { - public static final WeakBuffer CANNOT_EVICT = BufferPool.allocateFake(); - - /** - * @param buffer Buffer to cache. - * @return Evicted buffer. All buffers are of the same size currently, so it is one or none. - * It can also be CANNOT_EVICT fake buffer, if we cannot evict and thus cache. - */ - WeakBuffer cache(WeakBuffer buffer); - void notifyLock(WeakBuffer buffer); - void notifyUnlock(WeakBuffer buffer); -} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/old/ChunkPool.java b/llap-server/src/java/org/apache/hadoop/hive/llap/old/ChunkPool.java deleted file mode 100644 index 4f9f165..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/old/ChunkPool.java +++ /dev/null @@ -1,237 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.old; - -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.hadoop.hive.llap.DebugUtils; -import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; -import org.apache.hadoop.hive.llap.old.BufferPool.WeakBuffer; -import org.apache.hadoop.hive.llap.old.ChunkPool.Chunk; - -/** - * This class contains the mapping of file chunks to buffers inside BufferPool. - */ -public class ChunkPool /*implements EvictionListener*/ { - private final ConcurrentHashMap chunkCache = new ConcurrentHashMap(); - - /** Number of unprocessed evictions, for the background thread. */ - private final AtomicInteger newEvictions = new AtomicInteger(0); - private final Thread cleanupThread; - - public ChunkPool() { - cleanupThread = new CleanupThread(); - cleanupThread.start(); - } - - /** - * Gets a chunk from cache - * TODO: We expect that in most cases, some related chunks (e.g. columns for a stripe) - * will be stored in the same buffer. We could use this to get keys more efficiently. - * On the other hand, real stripes are pretty big. - * @param key key to search for. - * @return Chunk corresponding to k. - */ - public Chunk getChunk(K key, HashSet lockedBuffers) { - while (true) { - Chunk result = chunkCache.get(key); - if (result == null) return null; - if (lockChunk(result, lockedBuffers)) return result; - if (chunkCache.remove(key, result)) return null; - } - } - - private boolean lockChunk(Chunk result, HashSet lockedBuffers) { - // We expect the chain to have 1 or 2 buffers (2 if we are on buffer boundary). Keep track of - // what we lock in the bitmask; may need fixing (extremely unlikely - 64+ buffer, giant chunks) - boolean failedToLock = false; - long blocksToUnlock = 0; - long bit = 1 << 63; // The bit indicating that current chunk was locked. - - Chunk chunk = result; - while (chunk != null) { - if (lockedBuffers.contains(chunk.buffer)) { - assert chunk.buffer.isLocked() : chunk.buffer + " is in lockedBuffers but is not locked"; - } else if (chunk.buffer.lock(true)) { - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Locked " + chunk.buffer + " for " + result); - } - lockedBuffers.add(chunk.buffer); - blocksToUnlock += bit; - } else { - failedToLock = true; - break; - } - bit >>>= 1; - chunk = chunk.nextChunk; - if (bit == 1 && chunk != null) { - throw new AssertionError("Chunk chain was too long"); - } - } - if (!failedToLock) return true; - - bit = 1 << 63; - Chunk chunk2 = result; - while (chunk2 != chunk) { - if ((blocksToUnlock & bit) == bit) { - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Unlocking " + chunk2.buffer + " due to failed chunk lock"); - } - lockedBuffers.remove(chunk2.buffer); - chunk2.buffer.unlock(); - } - bit >>>= 1; - chunk2 = chunk2.nextChunk; - } - return false; - } - - private boolean verifyChunk(Chunk entry) { - Chunk chunk = entry; - while (chunk != null) { - if (!chunk.buffer.lock(false)) break; - chunk = chunk.nextChunk; - } - Chunk chunk2 = entry; - while (chunk2 != chunk) { - chunk2.buffer.unlock(); - chunk2 = chunk2.nextChunk; - } - return chunk == null; - } - - public Chunk addOrGetChunk(K key, Chunk val, HashSet lockedBuffers) { - assert val.buffer.isLocked(); - while (true) { - Chunk oldVal = chunkCache.putIfAbsent(key, val); - if (oldVal == null) return val; - if (DebugUtils.isTraceCachingEnabled()) { - LlapIoImpl.LOG.info("Trying to cache when the chunk is already cached for " - + key + "; old " + oldVal + ", new " + val); - } - if (lockChunk(oldVal, lockedBuffers)) return oldVal; - // We found some old value but couldn't lock it; remove it. - chunkCache.remove(key, oldVal); - } - } - - //@Override - public void evictionNotice(WeakBuffer evicted) { - int oldValue = newEvictions.getAndIncrement(); - if (oldValue == 0) { - synchronized (newEvictions) { - newEvictions.notifyAll(); - } - } - } - - public static class Chunk { - public WeakBuffer buffer; - public int offset, length; - public Chunk nextChunk; - - public Chunk(WeakBuffer buffer, int offset, int length) { - this.buffer = buffer; - this.offset = offset; - this.length = length; - } - - public Chunk addChunk(Chunk another) { - // Traversing list is bad; however, we expect that this will very rarely happen; and in - // nearly all the cases when it does (buffer boundary) the list will have 1 element. - Chunk chunk = this; - while (chunk.nextChunk != null) { - chunk = chunk.nextChunk; - } - chunk.nextChunk = another; - return this; - } - - @Override - public String toString() { - return "{" + buffer + ", " + offset + ", " + length + "}"; - } - - public String toFullString() { - String result = ""; - Chunk chunk = this; - while (chunk != null) { - result += chunk.toString() + ", "; - chunk = chunk.nextChunk; - } - return result; - } - } - - private final class CleanupThread extends Thread { - private int APPROX_CLEANUP_INTERVAL_SEC = 600; - - public CleanupThread() { - super("Llap ChunkPool cleanup thread"); - setDaemon(true); - setPriority(1); - } - - @Override - public void run() { - while (true) { - try { - doOneCleanupRound(); - } catch (InterruptedException ex) { - LlapIoImpl.LOG.warn("Cleanup thread has been interrupted"); - Thread.currentThread().interrupt(); - break; - } catch (Throwable t) { - LlapIoImpl.LOG.error("Cleanup has failed; the thread will now exit", t); - break; - } - } - } - - private void doOneCleanupRound() throws InterruptedException { - while (true) { - int evictionsSinceLast = newEvictions.getAndSet(0); - if (evictionsSinceLast > 0) break; - synchronized (newEvictions) { - newEvictions.wait(10000); - } - } - // Duration is an estimate; if the size of the map changes rapidly, it can be very different. - long endTime = System.nanoTime() + APPROX_CLEANUP_INTERVAL_SEC * 1000000000L; - int processed = 0; - // TODO: if this iterator affects the map in some bad way, - // we'd need to sleep once per round instead. - Iterator> iter = chunkCache.entrySet().iterator(); - while (iter.hasNext()) { - if (!verifyChunk(iter.next().getValue())) { - iter.remove(); - } - ++processed; - int approxElementsLeft = chunkCache.size() - processed; - Thread.sleep((approxElementsLeft <= 0) - ? 1 : (endTime - System.nanoTime()) / (1000000L * approxElementsLeft)); - } - } - } -} diff --git a/llap-server/src/main/resources/llap-daemon-log4j2.properties b/llap-server/src/main/resources/llap-daemon-log4j2.properties index 5051ca5..a79a3e8 100644 --- a/llap-server/src/main/resources/llap-daemon-log4j2.properties +++ b/llap-server/src/main/resources/llap-daemon-log4j2.properties @@ -64,7 +64,19 @@ appender.HISTORYAPPENDER.strategy.type = DefaultRolloverStrategy appender.HISTORYAPPENDER.strategy.max = ${sys:llap.daemon.log.maxbackupindex} # list of all loggers -loggers = NIOServerCnxn, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, HistoryLogger +loggers = NIOServerCnxn, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, HistoryLogger, LlapIoImpl, LlapIoOrc, LlapIoCache, LlapIoLocking + +logger.LlapIoImpl.name = LlapIoImpl +logger.LlapIoImpl.level = WARN + +logger.LlapIoOrc.name = LlapIoOrc +logger.LlapIoOrc.level = WARN + +logger.LlapIoCache.name = LlapIoCache +logger.LlapIOCache.level = WARN + +logger.LlapIoLocking.name = LlapIoLocking +logger.LlapIoLocking.level = WARN logger.NIOServerCnxn.name = org.apache.zookeeper.server.NIOServerCnxn logger.NIOServerCnxn.level = WARN diff --git a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java index ea626d7..3d81e43 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java @@ -23,49 +23,6 @@ * trace messages with low runtime cost, in order to investigate reproducible bugs. */ public class DebugUtils { - - public static boolean isTraceEnabled() { - return false; - } - - public static boolean isTraceOrcEnabled() { - return false; - } - - public static boolean isTraceLockingEnabled() { - return false; - } - - public static boolean isTraceMttEnabled() { - return false; - } - - public static boolean isTraceCachingEnabled() { - return false; - } - - public static String toString(long[] a, int offset, int len) { - StringBuilder b = new StringBuilder(); - b.append('['); - for (int i = offset; i < offset + len; ++i) { - b.append(a[i]); - b.append(", "); - } - b.append(']'); - return b.toString(); - } - - public static String toString(byte[] a, int offset, int len) { - StringBuilder b = new StringBuilder(); - b.append('['); - for (int i = offset; i < offset + len; ++i) { - b.append(a[i]); - b.append(", "); - } - b.append(']'); - return b.toString(); - } - public static String toString(boolean[] a) { StringBuilder b = new StringBuilder(); b.append('['); diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LogLevels.java b/ql/src/java/org/apache/hadoop/hive/llap/LogLevels.java deleted file mode 100644 index 300230f..0000000 --- a/ql/src/java/org/apache/hadoop/hive/llap/LogLevels.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional debugrmation - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap; - -import org.slf4j.Logger; - -public class LogLevels { - private final boolean isT, isD, isI, isW, isE; - - public LogLevels(Logger log) { - isT = log.isTraceEnabled(); - isD = log.isDebugEnabled(); - isI = log.isInfoEnabled(); - isW = log.isWarnEnabled(); - isE = log.isErrorEnabled(); - } - - public boolean isTraceEnabled() { - return isT; - } - - public boolean isDebugEnabled() { - return isD; - } - - public boolean isInfoEnabled() { - return isI; - } - - public boolean isWarnEnabled() { - return isW; - } - - public boolean isErrorEnabled() { - return isE; - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index b8490df..31f5c72 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -815,7 +815,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { aggregationBatchInfo = new VectorAggregationBufferBatch(); aggregationBatchInfo.compileAggregationBatchInfo(aggregators); } - LOG.warn("VectorGroupByOperator is vector output " + isVectorOutput); + LOG.info("VectorGroupByOperator is vector output {}", isVectorOutput); outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector( outputFieldNames, objectInspectors); if (isVectorOutput) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 6cec80e..2f90320 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -103,7 +103,6 @@ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end private final List types; private final long rowIndexStride; private final DataCache cache; - private ByteBufferAllocatorPool pool; private boolean isDebugTracingEnabled; public EncodedReaderImpl(Long fileId, List types, CompressionCodec codec, @@ -468,9 +467,6 @@ public void setDebugTracing(boolean isEnabled) { @Override public void close() throws IOException { dataReader.close(); - if (pool != null) { - pool.clear(); - } } /** @@ -770,7 +766,6 @@ private CacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffse * to handle just for this case. * We could avoid copy in non-zcr case and manage the buffer that was not allocated by our * allocator. Uncompressed case is not mainline though so let's not complicate it. - * @param qfCounters */ private DiskRangeList preReadUncompressedStream(long baseOffset, DiskRangeList start, long streamOffset, long streamEnd) throws IOException { @@ -1133,7 +1128,6 @@ public String toString() { * multiple ranges (usually, that would only happen with zcr). * Adds stuff to cachedBuffers, toDecompress and toRelease (see below what each does). * @param current BufferChunk where compression block starts. - * @param ranges Iterator of all chunks, pointing at current. * @param cacheBuffers The result buffer array to add pre-allocated target cache buffer. * @param toDecompress The list of work to decompress - pairs of compressed buffers and the * target buffers (same as the ones added to cacheBuffers). @@ -1250,9 +1244,7 @@ private void addIncompleteCompressionBuffer( * @param isUncompressed Whether the data in the block is uncompressed. * @param cbStartOffset Compressed start offset of the fCB. * @param cbEndOffset Compressed end offset of the fCB. - * @param lastRange The buffer from which the last (or all) bytes of fCB come. * @param lastChunkLength The number of compressed bytes consumed from last *chunk* into fullCompressionBlock. - * @param ranges The iterator of all compressed ranges for the stream, pointing at lastRange. * @param lastChunk * @param toDecompress See addOneCompressionBuffer. * @param cacheBuffers See addOneCompressionBuffer.