diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java index 7d68294..3efbcc2 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java @@ -186,9 +186,7 @@ private static void addCollectionEstimator(HashMap, ObjectEstimator> by fieldCol = (Collection)fieldObj; if (fieldCol.size() == 0) { fieldCol = null; - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Empty collection " + field); - } + LlapIoImpl.LOG.trace("Empty collection {}", field); } } if (fieldCol != null) { @@ -219,9 +217,7 @@ private static void addMapEstimator(HashMap, ObjectEstimator> byType, fieldCol = (Map)fieldObj; if (fieldCol.size() == 0) { fieldCol = null; - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Empty map " + field); - } + LlapIoImpl.LOG.trace("Empty map {}", field); } } if (fieldCol != null) { @@ -257,15 +253,11 @@ private static void addMapEstimator(HashMap, ObjectEstimator> byType, return new Class[] { (Class)types[0], (Class)types[1] }; } else { // TODO: we could try to get the declaring object and infer argument... stupid Java. - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Cannot determine map type: " + field); - } + LlapIoImpl.LOG.trace("Cannot determine map type: {}", field); } } else { // TODO: we could try to get superclass or generic interfaces. - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Non-parametrized map type: " + field); - } + LlapIoImpl.LOG.trace("Non-parametrized map type: {}", field); } return null; } @@ -279,15 +271,11 @@ private static void addMapEstimator(HashMap, ObjectEstimator> byType, return (Class)type; } else { // TODO: we could try to get the declaring object and infer argument... stupid Java. - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Cannot determine collection type: " + field); - } + LlapIoImpl.LOG.trace("Cannot determine collection type: {}", field); } } else { // TODO: we could try to get superclass or generic interfaces. - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Non-parametrized collection type: " + field); - } + LlapIoImpl.LOG.trace("Non-parametrized collection type: {}", field); } return null; } @@ -297,11 +285,7 @@ private static void addArrayEstimator( Field field, Object fieldObj) { if (fieldObj == null) return; int arrayLen = Array.getLength(fieldObj); - if (arrayLen == 0) { - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Empty array " + field); - } - } + LlapIoImpl.LOG.trace("Empty array {}", field); for (int i = 0; i < arrayLen; ++i) { Object element = Array.get(fieldObj, i); if (element != null) { @@ -416,10 +400,8 @@ protected int estimate(Object obj, HashMap, ObjectEstimator> parent, ObjectEstimator collEstimator = parent.get(fieldObj.getClass()); if (collEstimator == null) { // We have no estimator for this type... assume low overhead and hope for the best. - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Approximate estimation for collection " - + fieldObj.getClass().getName() + " from " + e.field); - } + LlapIoImpl.LOG.trace("Approximate estimation for collection {} from {}", e.field, + fieldObj.getClass().getName()); referencedSize += memoryModel.object(); referencedSize += estimateCollectionElements(parent, c, e.field, uniqueObjects); referencedSize += memoryModel.array() + c.size() * memoryModel.ref(); @@ -429,10 +411,8 @@ protected int estimate(Object obj, HashMap, ObjectEstimator> parent, referencedSize += ((CollectionEstimator)collEstimator).estimateOverhead(c.size()); } else { // We decided to treat this collection as regular object. - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Verbose estimation for collection " - + fieldObj.getClass().getName() + " from " + e.field); - } + LlapIoImpl.LOG.trace("Verbose estimation for collection {} from {}", + fieldObj.getClass().getName(), e.field); referencedSize += collEstimator.estimate(c, parent, uniqueObjects); } break; @@ -442,10 +422,8 @@ protected int estimate(Object obj, HashMap, ObjectEstimator> parent, ObjectEstimator collEstimator = parent.get(fieldObj.getClass()); if (collEstimator == null) { // We have no estimator for this type... assume low overhead and hope for the best. - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Approximate estimation for map " - + fieldObj.getClass().getName() + " from " + e.field); - } + LlapIoImpl.LOG.trace("Approximate estimation for map {} from {}", + fieldObj.getClass().getName(), e.field); referencedSize += memoryModel.object(); referencedSize += estimateMapElements(parent, m, e.field, uniqueObjects); referencedSize += memoryModel.array() + m.size() @@ -456,10 +434,8 @@ protected int estimate(Object obj, HashMap, ObjectEstimator> parent, referencedSize += ((CollectionEstimator)collEstimator).estimateOverhead(m.size()); } else { // We decided to treat this map as regular object. - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info("Verbose estimation for map " - + fieldObj.getClass().getName() + " from " + e.field); - } + LlapIoImpl.LOG.trace("Verbose estimation for map {} from {}", + fieldObj.getClass().getName(), e.field); referencedSize += collEstimator.estimate(m, parent, uniqueObjects); } break; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java index 840aeab..d1a961c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java @@ -83,8 +83,8 @@ int incRef() { newRefCount = oldRefCount + 1; if (refCount.compareAndSet(oldRefCount, newRefCount)) break; } - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Locked " + this + "; new ref count " + newRefCount); + if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) { + LlapIoImpl.LOCKING_LOGGER.trace("Locked {}; new ref count {}", this, newRefCount); } return newRefCount; } @@ -109,8 +109,8 @@ public boolean isInvalid() { int decRef() { int newRefCount = refCount.decrementAndGet(); - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Unlocked " + this + "; refcount " + newRefCount); + if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) { + LlapIoImpl.LOCKING_LOGGER.trace("Unlocked {}; refcount {}", this, newRefCount); } if (newRefCount < 0) { throw new AssertionError("Unexpected refCount " + newRefCount + ": " + this); @@ -128,8 +128,8 @@ public boolean invalidate() { if (value != 0) return false; if (refCount.compareAndSet(value, EVICTED_REFCOUNT)) break; } - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Invalidated " + this + " due to eviction"); + if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) { + LlapIoImpl.LOCKING_LOGGER.trace("Invalidated {} due to eviction", this); } return true; } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java index a60fed3..038c3ed 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java @@ -58,9 +58,8 @@ public LowLevelCacheImpl(LlapDaemonCacheMetrics metrics, LowLevelCachePolicy cac @VisibleForTesting LowLevelCacheImpl(LlapDaemonCacheMetrics metrics, LowLevelCachePolicy cachePolicy, EvictionAwareAllocator allocator, boolean doAssumeGranularBlocks, long cleanupInterval) { - if (LlapIoImpl.LOGL.isInfoEnabled()) { - LlapIoImpl.LOG.info("Low level cache; cleanup interval " + cleanupInterval + "sec"); - } + + LlapIoImpl.LOG.info("Low level cache; cleanup interval {} sec", cleanupInterval); this.cachePolicy = cachePolicy; this.allocator = allocator; this.cleanupInterval = cleanupInterval; @@ -148,8 +147,8 @@ private void getOverlappingRanges(long baseOffset, DiskRangeList currentNotCache LlapDataBuffer buffer = e.getValue(); long requestedLength = currentNotCached.getLength(); // Lock the buffer, validate it and add to results. - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Locking " + buffer + " during get"); + if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) { + LlapIoImpl.LOCKING_LOGGER.trace("Locking {} during get", buffer); } if (!lockBuffer(buffer, true)) { @@ -183,7 +182,6 @@ private void getOverlappingRanges(long baseOffset, DiskRangeList currentNotCache * Adds cached buffer to buffer list. * @param currentNotCached Pointer to the list node where we are inserting. * @param currentCached The cached buffer found for this node, to insert. - * @param resultObj * @return The new currentNotCached pointer, following the cached buffer insertion. */ private DiskRangeList addCachedBufferToIter( @@ -240,8 +238,8 @@ private boolean lockBuffer(LlapDataBuffer buffer, boolean doNotifyPolicy) { try { for (int i = 0; i < ranges.length; ++i) { LlapDataBuffer buffer = (LlapDataBuffer)buffers[i]; - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Locking " + buffer + " at put time"); + if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) { + LlapIoImpl.LOCKING_LOGGER.trace("Locking {} at put time", buffer); } boolean canLock = lockBuffer(buffer, false); assert canLock; @@ -258,13 +256,13 @@ private boolean lockBuffer(LlapDataBuffer buffer, boolean doNotifyPolicy) { } break; } - if (DebugUtils.isTraceCachingEnabled()) { - LlapIoImpl.LOG.info("Trying to cache when the chunk is already cached for " - + fileKey + "@" + offset + " (base " + baseOffset + "); old " + oldVal - + ", new " + buffer); + if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) { + LlapIoImpl.CACHE_LOGGER.trace("Trying to cache when the chunk is already cached for" + + " {}@{} (base {}); old {}, new {}", fileKey, offset, baseOffset, oldVal, buffer); } - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Locking " + oldVal + " due to cache collision"); + + if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) { + LlapIoImpl.LOCKING_LOGGER.trace("Locking {} due to cache collision", oldVal); } if (lockBuffer(oldVal, true)) { // We don't do proper overlap checking because it would cost cycles and we @@ -275,8 +273,9 @@ private boolean lockBuffer(LlapDataBuffer buffer, boolean doNotifyPolicy) { + " (base " + baseOffset + ")"); } // We found an old, valid block for this key in the cache. - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Unlocking " + buffer + " due to cache collision with " + oldVal); + if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) { + LlapIoImpl.LOCKING_LOGGER.trace("Unlocking {} due to cache collision with {}", + buffer, oldVal); } unlockBuffer(buffer, false); @@ -353,8 +352,8 @@ private void unlockBuffer(LlapDataBuffer buffer, boolean handleLastDecRef) { if (buffer.declaredCachedLength != LlapDataBuffer.UNKNOWN_CACHED_LENGTH) { cachePolicy.notifyUnlock(buffer); } else { - if (DebugUtils.isTraceCachingEnabled()) { - LlapIoImpl.LOG.info("Deallocating " + buffer + " that was not cached"); + if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) { + LlapIoImpl.CACHE_LOGGER.trace("Deallocating {} that was not cached", buffer); } allocator.deallocate(buffer); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java index 1cfe2bc..4def4a1 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java @@ -50,9 +50,9 @@ public LowLevelCacheMemoryManager( this.usedMemory = new AtomicLong(0); this.metrics = metrics; metrics.setCacheCapacityTotal(maxSize); - if (LlapIoImpl.LOGL.isInfoEnabled()) { - LlapIoImpl.LOG.info("Memory manager initialized with max size " + maxSize + " and " - + ((evictor == null) ? "no " : "") + "ability to evict blocks"); + if (LlapIoImpl.LOG.isInfoEnabled()) { + LlapIoImpl.LOG.info("Memory manager initialized with max size {} and" + + " {} ability to evict blocks", maxSize, ((evictor == null) ? "no " : "")); } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java index 1430eae..0838682 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java @@ -35,9 +35,7 @@ private LlapOomDebugDump parentDebugDump; public LowLevelFifoCachePolicy(Configuration conf) { - if (LlapIoImpl.LOGL.isInfoEnabled()) { - LlapIoImpl.LOG.info("FIFO cache policy"); - } + LlapIoImpl.LOG.info("FIFO cache policy"); buffers = new LinkedList(); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java index 6f52b86..bbff3cc 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java @@ -82,10 +82,8 @@ public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, Configuration co int lrfuThreshold = (int)((Math.log(1 - Math.pow(0.5, lambda)) / Math.log(0.5)) / lambda); maxHeapSize = Math.min(lrfuThreshold, maxBuffers); } - if (LlapIoImpl.LOGL.isInfoEnabled()) { - LlapIoImpl.LOG.info("LRFU cache policy with min buffer size " + minBufferSize - + " and lambda " + lambda + " (heap size " + maxHeapSize + ")"); - } + LlapIoImpl.LOG.info("LRFU cache policy with min buffer size {} and lambda {} (heap size {})", + minBufferSize, lambda, maxHeapSize); heap = new LlapCacheableBuffer[maxHeapSize]; listHead = listTail = null; @@ -123,8 +121,8 @@ public void notifyLock(LlapCacheableBuffer buffer) { @Override public void notifyUnlock(LlapCacheableBuffer buffer) { long time = timer.incrementAndGet(); - if (DebugUtils.isTraceCachingEnabled()) { - LlapIoImpl.LOG.info("Touching " + buffer + " at " + time); + if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) { + LlapIoImpl.CACHE_LOGGER.trace("Touching {} at {}", buffer, time); } synchronized (heap) { // First, update buffer priority - we have just been using it. @@ -263,8 +261,8 @@ private LlapCacheableBuffer evictFromHeapUnderLock(long time) { while (true) { if (heapSize == 0) return null; LlapCacheableBuffer result = heap[0]; - if (DebugUtils.isTraceCachingEnabled()) { - LlapIoImpl.LOG.info("Evicting " + result + " at " + time); + if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) { + LlapIoImpl.CACHE_LOGGER.info("Evicting {} at {}", result, time); } result.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE; --heapSize; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java index 734a5c0..b188c0e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java @@ -30,9 +30,7 @@ private final LlapDaemonCacheMetrics metrics; public SimpleBufferManager(Allocator allocator, LlapDaemonCacheMetrics metrics) { - if (LlapIoImpl.LOGL.isInfoEnabled()) { - LlapIoImpl.LOG.info("Simple buffer manager"); - } + LlapIoImpl.LOG.info("Simple buffer manager"); this.allocator = allocator; this.metrics = metrics; } @@ -46,8 +44,8 @@ private boolean lockBuffer(LlapDataBuffer buffer) { private void unlockBuffer(LlapDataBuffer buffer) { if (buffer.decRef() == 0) { - if (DebugUtils.isTraceCachingEnabled()) { - LlapIoImpl.LOG.info("Deallocating " + buffer + " that was not cached"); + if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) { + LlapIoImpl.CACHE_LOGGER.trace("Deallocating {} that was not cached", buffer); } allocator.deallocate(buffer); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index 85cca97..9fb79a5 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -264,21 +264,21 @@ ColumnVectorBatch nextCvb() throws InterruptedException, IOException { } synchronized (pendingData) { // We are waiting for next block. Either we will get it, or be told we are done. - boolean doLogBlocking = DebugUtils.isTraceMttEnabled() && isNothingToReport(); + boolean doLogBlocking = LlapIoImpl.LOG.isTraceEnabled() && isNothingToReport(); if (doLogBlocking) { - LlapIoImpl.LOG.info("next will block"); + LlapIoImpl.LOG.trace("next will block"); } while (isNothingToReport()) { pendingData.wait(100); } if (doLogBlocking) { - LlapIoImpl.LOG.info("next is unblocked"); + LlapIoImpl.LOG.trace("next is unblocked"); } rethrowErrorIfAny(); lastCvb = pendingData.poll(); } - if (DebugUtils.isTraceMttEnabled() && lastCvb != null) { - LlapIoImpl.LOG.info("Processing will receive vector " + lastCvb); + if (LlapIoImpl.LOG.isTraceEnabled() && lastCvb != null) { + LlapIoImpl.LOG.trace("Processing will receive vector {}", lastCvb); } return lastCvb; } @@ -304,9 +304,9 @@ public long getPos() throws IOException { @Override public void close() throws IOException { - if (DebugUtils.isTraceMttEnabled()) { - LlapIoImpl.LOG.info("close called; closed " + isClosed + ", done " + isDone - + ", err " + pendingError + ", pending " + pendingData.size()); + if (LlapIoImpl.LOG.isTraceEnabled()) { + LlapIoImpl.LOG.trace("close called; closed {}, done {}, err {}, pending {}", + isClosed, isDone, pendingError, pendingData.size()); } LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged! feedback.stop(); @@ -323,9 +323,9 @@ private void rethrowErrorIfAny() throws IOException { @Override public void setDone() { - if (DebugUtils.isTraceMttEnabled()) { - LlapIoImpl.LOG.info("setDone called; closed " + isClosed - + ", done " + isDone + ", err " + pendingError + ", pending " + pendingData.size()); + if (LlapIoImpl.LOG.isTraceEnabled()) { + LlapIoImpl.LOG.trace("setDone called; closed {}, done {}, err {}, pending {}", + isClosed, isDone, pendingError, pendingData.size()); } synchronized (pendingData) { isDone = true; @@ -335,9 +335,9 @@ public void setDone() { @Override public void consumeData(ColumnVectorBatch data) { - if (DebugUtils.isTraceMttEnabled()) { - LlapIoImpl.LOG.info("consume called; closed " + isClosed + ", done " + isDone - + ", err " + pendingError + ", pending " + pendingData.size()); + if (LlapIoImpl.LOG.isTraceEnabled()) { + LlapIoImpl.LOG.trace("consume called; closed {}, done {}, err {}, pending {}", + isClosed, isDone, pendingError, pendingData.size()); } synchronized (pendingData) { if (isClosed) { @@ -351,8 +351,8 @@ public void consumeData(ColumnVectorBatch data) { @Override public void setError(Throwable t) { counters.incrCounter(LlapIOCounters.NUM_ERRORS); - LlapIoImpl.LOG.info("setError called; closed " + isClosed - + ", done " + isDone + ", err " + pendingError + ", pending " + pendingData.size()); + LlapIoImpl.LOG.info("setError called; closed {}, done {}, err {}, pending {}", + isClosed, isDone, pendingError, pendingData.size()); assert t != null; synchronized (pendingData) { pendingError = t; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index dbee823..36f8dec 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.llap.io.api.impl; -import org.apache.hadoop.hive.llap.LogLevels; - import java.io.IOException; import java.util.concurrent.Executors; @@ -58,8 +56,11 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; public class LlapIoImpl implements LlapIo { - public static final Logger LOG = LoggerFactory.getLogger(LlapIoImpl.class); - public static final LogLevels LOGL = new LogLevels(LOG); + public static final Logger LOG = LoggerFactory.getLogger("LlapIoImpl"); + public static final Logger ORC_LOGGER = LoggerFactory.getLogger("LlapIoOrc"); + public static final Logger CACHE_LOGGER = LoggerFactory.getLogger("LlapIoCache"); + public static final Logger LOCKING_LOGGER = LoggerFactory.getLogger("LlapIoLocking"); + private static final String MODE_CACHE = "cache", MODE_ALLOCATOR = "allocator"; private final ColumnVectorProducer cvp; @@ -73,9 +74,7 @@ private LlapIoImpl(Configuration conf) throws IOException { String ioMode = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MODE); boolean useLowLevelCache = LlapIoImpl.MODE_CACHE.equalsIgnoreCase(ioMode), useAllocOnly = !useLowLevelCache && LlapIoImpl.MODE_ALLOCATOR.equalsIgnoreCase(ioMode); - if (LOGL.isInfoEnabled()) { - LOG.info("Initializing LLAP IO in " + ioMode + " mode"); - } + LOG.info("Initializing LLAP IO in {} mode", ioMode); String displayName = "LlapDaemonCacheMetrics-" + MetricsUtils.getHostName(); String sessionId = conf.get("llap.daemon.metrics.sessionid"); @@ -86,8 +85,8 @@ private LlapIoImpl(Configuration conf) throws IOException { HiveConf.ConfVars.LLAP_QUEUE_METRICS_PERCENTILE_INTERVALS)); this.queueMetrics = LlapDaemonQueueMetrics.create(displayName, sessionId, intervals); - LOG.info("Started llap daemon metrics with displayName: " + displayName + - " sessionId: " + sessionId); + LOG.info("Started llap daemon metrics with displayName: {} sessionId: {}", displayName, + sessionId); OrcMetadataCache metadataCache = null; LowLevelCacheImpl orcCache = null; @@ -128,9 +127,7 @@ private LlapIoImpl(Configuration conf) throws IOException { // TODO: this should depends on input format and be in a map, or something. this.cvp = new OrcColumnVectorProducer( metadataCache, orcCache, bufferManager, conf, cacheMetrics, queueMetrics); - if (LOGL.isInfoEnabled()) { - LOG.info("LLAP IO initialized"); - } + LOG.info("LLAP IO initialized"); registerMXBeans(); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java index 37fc8d0..024c485 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java @@ -49,9 +49,7 @@ public OrcColumnVectorProducer(OrcMetadataCache metadataCache, LowLevelCacheImpl lowLevelCache, BufferUsageManager bufferManager, Configuration conf, LlapDaemonCacheMetrics metrics, LlapDaemonQueueMetrics queueMetrics) { - if (LlapIoImpl.LOGL.isInfoEnabled()) { - LlapIoImpl.LOG.info("Initializing ORC column vector producer"); - } + LlapIoImpl.LOG.info("Initializing ORC column vector producer"); this.metadataCache = metadataCache; this.lowLevelCache = lowLevelCache; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index eb251a8..fb0867d 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -184,9 +184,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff @Override public void stop() { - if (LOG.isDebugEnabled()) { - LOG.debug("Encoded reader is being stopped"); - } + LOG.debug("Encoded reader is being stopped"); isStopped = true; } @@ -214,9 +212,7 @@ public Void run() throws Exception { protected Void performDataRead() throws IOException { long startTime = counters.startTimeCounter(); - if (LlapIoImpl.LOGL.isInfoEnabled()) { - LlapIoImpl.LOG.info("Processing data for " + split.getPath()); - } + LlapIoImpl.LOG.info("Processing data for {}", split.getPath()); if (processStop()) { recordReaderTime(startTime); return null; @@ -310,7 +306,7 @@ protected Void performDataRead() throws IOException { // Reader creating updates HDFS counters, don't do it here. DataWrapperForOrc dw = new DataWrapperForOrc(); stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY); - stripeReader.setDebugTracing(DebugUtils.isTraceOrcEnabled()); + stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled()); } catch (Throwable t) { consumer.setError(t); recordReaderTime(startTime); @@ -338,10 +334,8 @@ protected Void performDataRead() throws IOException { if (cols != null && cols.isEmpty()) continue; // No need to read this stripe. stripe = fileMetadata.getStripes().get(stripeIx); - if (DebugUtils.isTraceOrcEnabled()) { - LlapIoImpl.LOG.info("Reading stripe " + stripeIx + ": " - + stripe.getOffset() + ", " + stripe.getLength()); - } + LlapIoImpl.ORC_LOGGER.trace("Reading stripe {}: {}, {}", stripeIx, stripe.getOffset(), + stripe.getLength()); colRgs = readState[stripeIxMod]; // We assume that NO_RGS value is only set from SARG filter and for all columns; // intermediate changes for individual columns will unset values in the array. @@ -379,18 +373,18 @@ protected Void performDataRead() throws IOException { counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTimeHdfs); if (hasFileId && metadataCache != null) { stripeMetadata = metadataCache.putStripeMetadata(stripeMetadata); - if (DebugUtils.isTraceOrcEnabled()) { - LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx - + " metadata with includes: " + DebugUtils.toString(stripeIncludes)); + if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) { + LlapIoImpl.ORC_LOGGER.trace("Caching stripe {} metadata with includes: {}", + stripeKey.stripeIx, DebugUtils.toString(stripeIncludes)); } } } consumer.setStripeMetadata(stripeMetadata); } if (!stripeMetadata.hasAllIndexes(stripeIncludes)) { - if (DebugUtils.isTraceOrcEnabled()) { - LlapIoImpl.LOG.info("Updating indexes in stripe " + stripeKey.stripeIx - + " metadata for includes: " + DebugUtils.toString(stripeIncludes)); + if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) { + LlapIoImpl.ORC_LOGGER.trace("Updating indexes in stripe {} metadata for includes: {}", + stripeKey.stripeIx, DebugUtils.toString(stripeIncludes)); } assert isFoundInCache; counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS); @@ -432,9 +426,8 @@ protected Void performDataRead() throws IOException { // Done with all the things. recordReaderTime(startTime); consumer.setDone(); - if (DebugUtils.isTraceMttEnabled()) { - LlapIoImpl.LOG.info("done processing " + split); - } + + LlapIoImpl.LOG.trace("done processing {}", split); // Close the stripe reader, we are done reading. cleanupReaders(); @@ -584,9 +577,7 @@ private void ensureOrcReader() throws IOException { if (fileKey instanceof Long && HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_USE_FILEID_PATH)) { path = HdfsUtils.getFileIdPath(fs, path, (long)fileKey); } - if (DebugUtils.isTraceOrcEnabled()) { - LOG.info("Creating reader for " + path + " (" + split.getPath() + ")"); - } + LlapIoImpl.ORC_LOGGER.trace("Creating reader for {} ({})", path, split.getPath()); long startTime = counters.startTimeCounter(); ReaderOptions opts = OrcFile.readerOptions(conf).filesystem(fs).fileMetadata(fileMetadata); orcReader = EncodedOrcFile.createReader(path, opts); @@ -640,17 +631,17 @@ private OrcFileMetadata getOrReadFileMetadata() throws IOException { counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); if (hasFileId && metadataCache != null) { value = metadataCache.putStripeMetadata(value); - if (DebugUtils.isTraceOrcEnabled()) { - LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx - + " metadata with includes: " + DebugUtils.toString(globalInc)); + if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) { + LlapIoImpl.ORC_LOGGER.trace("Caching stripe {} metadata with includes: {}", + stripeKey.stripeIx, DebugUtils.toString(globalInc)); } } } // We might have got an old value from cache; recheck it has indexes. if (!value.hasAllIndexes(globalInc)) { - if (DebugUtils.isTraceOrcEnabled()) { - LlapIoImpl.LOG.info("Updating indexes in stripe " + stripeKey.stripeIx - + " metadata for includes: " + DebugUtils.toString(globalInc)); + if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) { + LlapIoImpl.ORC_LOGGER.trace("Updating indexes in stripe {} metadata for includes: {}", + stripeKey.stripeIx, DebugUtils.toString(globalInc)); } updateLoadedIndexes(value, si, globalInc, sargColumns); } @@ -677,9 +668,9 @@ public void returnData(OrcEncodedColumnBatch ecb) { if (datas == null) continue; for (ColumnStreamData data : datas) { if (data == null || data.decRef() != 0) continue; - if (DebugUtils.isTraceLockingEnabled()) { + if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) { for (MemoryBuffer buf : data.getCacheBuffers()) { - LlapIoImpl.LOG.info("Unlocking " + buf + " at the end of processing"); + LlapIoImpl.LOCKING_LOGGER.trace("Unlocking {} at the end of processing", buf); } } bufferManager.decRefBuffers(data.getCacheBuffers()); @@ -718,14 +709,14 @@ private boolean determineRgsToRead(boolean[] globalIncludes, int rowIndexStride, boolean isNone = rgsToRead == SargApplier.READ_NO_RGS, isAll = rgsToRead == SargApplier.READ_ALL_RGS; hasAnyData = hasAnyData || !isNone; - if (DebugUtils.isTraceOrcEnabled()) { + if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) { if (isNone) { - LlapIoImpl.LOG.info("SARG eliminated all RGs for stripe " + stripeIx); + LlapIoImpl.ORC_LOGGER.trace("SARG eliminated all RGs for stripe {}", stripeIx); } else if (!isAll) { - LlapIoImpl.LOG.info("SARG picked RGs for stripe " + stripeIx + ": " - + DebugUtils.toString(rgsToRead)); + LlapIoImpl.ORC_LOGGER.trace("SARG picked RGs for stripe {}: {}", + stripeIx, DebugUtils.toString(rgsToRead)); } else { - LlapIoImpl.LOG.info("Will read all " + rgCount + " RGs for stripe " + stripeIx); + LlapIoImpl.ORC_LOGGER.trace("Will read all {} RGs for stripe {}", rgCount, stripeIx); } } assert isAll || isNone || rgsToRead.length == rgCount; @@ -768,12 +759,12 @@ public void determineStripesToRead() { long offset = split.getStart(), maxOffset = offset + split.getLength(); stripeIxFrom = -1; int stripeIxTo = -1; - if (LlapIoImpl.LOGL.isDebugEnabled()) { + if (LlapIoImpl.ORC_LOGGER.isDebugEnabled()) { String tmp = "FileSplit {" + split.getStart() + ", " + split.getLength() + "}; stripes "; for (StripeInformation stripe : stripes) { tmp += "{" + stripe.getOffset() + ", " + stripe.getLength() + "}, "; } - LlapIoImpl.LOG.debug(tmp); + LlapIoImpl.ORC_LOGGER.debug(tmp); } int stripeIx = 0; @@ -785,33 +776,25 @@ public void determineStripesToRead() { continue; } if (stripeIxFrom == -1) { - if (DebugUtils.isTraceOrcEnabled()) { - LlapIoImpl.LOG.info("Including stripes from " + stripeIx - + " (" + stripeStart + " >= " + offset + ")"); - } + LlapIoImpl.ORC_LOGGER.trace("Including stripes from {} ({} >= {})", + stripeIx, stripeStart, offset); stripeIxFrom = stripeIx; } if (stripeStart >= maxOffset) { stripeIxTo = stripeIx; - if (DebugUtils.isTraceOrcEnabled()) { - LlapIoImpl.LOG.info("Including stripes until " + stripeIxTo + " (" + stripeStart - + " >= " + maxOffset + "); " + (stripeIxTo - stripeIxFrom) + " stripes"); - } + LlapIoImpl.ORC_LOGGER.trace("Including stripes until {} ({} >= {}); {} stripes", + stripeIxTo, stripeStart, maxOffset, (stripeIxTo - stripeIxFrom)); break; } ++stripeIx; } if (stripeIxFrom == -1) { - if (LlapIoImpl.LOG.isInfoEnabled()) { - LlapIoImpl.LOG.info("Not including any stripes - empty split"); - } + LlapIoImpl.LOG.info("Not including any stripes - empty split"); } if (stripeIxTo == -1 && stripeIxFrom != -1) { stripeIxTo = stripeIx; - if (DebugUtils.isTraceOrcEnabled()) { - LlapIoImpl.LOG.info("Including stripes until " + stripeIx + " (end of file); " - + (stripeIxTo - stripeIxFrom) + " stripes"); - } + LlapIoImpl.ORC_LOGGER.trace("Including stripes until {} (end of file); {} stripes", + stripeIx, (stripeIxTo - stripeIxFrom)); } readState = new boolean[stripeIxTo - stripeIxFrom][][]; } @@ -869,9 +852,9 @@ public DiskRangeList readFileData(DiskRangeList range, long baseOffset, long startTime = counters.startTimeCounter(); DiskRangeList result = orcDataReader.readFileData(range, baseOffset, doForceDirect); counters.recordHdfsTime(startTime); - if (DebugUtils.isTraceOrcEnabled() && LOG.isInfoEnabled()) { - LOG.info("Disk ranges after disk read (file " + fileKey + ", base offset " + baseOffset - + "): " + RecordReaderUtils.stringifyDiskRanges(result)); + if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) { + LlapIoImpl.ORC_LOGGER.trace("Disk ranges after disk read (file {}, base offset {}): {}", + fileKey, baseOffset, RecordReaderUtils.stringifyDiskRanges(result)); } return result; } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferInProgress.java b/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferInProgress.java deleted file mode 100644 index 9782b81..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferInProgress.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.old; - -import org.apache.hadoop.hive.llap.old.BufferPool.WeakBuffer; -import org.apache.hadoop.hive.llap.old.ChunkPool.Chunk; - -/** - * Helper struct that is used by loaders (e.g. OrcLoader) and chunk writer to write chunks. - */ -public class BufferInProgress { - /** Buffer that is being written to. */ - public final WeakBuffer buffer; - /** Offset in buffer where writing can proceed */ - public int offset; // TODO: use WB's position; these have separate lifecycle now, needed? - private final int bufferLimit; - - /** The chunk that is currently being written. */ - private Chunk chunkInProgress = null; - /** The row count of the chunk currently being written. */ - private int chunkInProgressRows = 0; - - public BufferInProgress(WeakBuffer buffer) { - this.buffer = buffer; - this.bufferLimit = buffer.getContents().limit(); - this.offset = 0; - } - - public Chunk ensureChunk() { - if (chunkInProgress == null) { - chunkInProgress = new Chunk(buffer, offset, 0); - chunkInProgressRows = 0; - } - return chunkInProgress; - } - - public Chunk extractChunk() { - Chunk result = chunkInProgress; - chunkInProgress = null; - chunkInProgressRows = 0; - return result; - } - - public void update(int newOffset, int rowsWritten) { - if (newOffset > bufferLimit) { - throw new AssertionError("Offset is beyond buffer limit: " + newOffset + "/" + bufferLimit - + "; previous offset " + offset + ", chunk " + chunkInProgress); - } - chunkInProgress.length += (newOffset - offset); - this.offset = newOffset; - this.chunkInProgressRows += rowsWritten; - } - - public int getChunkInProgressRows() { - return chunkInProgressRows; - } - - public int getSpaceLeft() { - return getSpaceLeft(-1); - } - - public int getSpaceLeft(int offset) { - offset = (offset >= 0) ? offset : this.offset; - return buffer.getContents().limit() - offset; - } -} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferPool.java b/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferPool.java deleted file mode 100644 index fc10b2b..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferPool.java +++ /dev/null @@ -1,225 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.hadoop.hive.llap.old; - -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.llap.DebugUtils; -import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; - -import com.google.common.annotations.VisibleForTesting; - -public class BufferPool { - // TODO: we should keep evicted buffers for reuse. Perhaps that too should be factored out. - private final CachePolicy cachePolicy; - private final Object evictionNotifyObj = new Object(); - private int evictionIsWaiting; // best effort flag - private final long maxCacheSize; - private final int bufferSize; - - - public BufferPool(Configuration conf) { - this.maxCacheSize = 0;// HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_CACHE_SIZE); - this.bufferSize = 0; // HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_BUFFER_SIZE); - this.cachePolicy = null; - } - - /** - * Allocates a new buffer. Buffer starts out locked (assumption is that caller is going to - * write to it immediately and then unlock it; future writers/readers will lock and unlock). - * @return Buffer. - */ - public WeakBuffer allocateBuffer() throws InterruptedException { - // TODO: for now, dumb byte arrays. Should be off-heap. - ByteBuffer newBuffer = ByteBuffer.allocate(bufferSize); - WeakBuffer wb = new WeakBuffer(this, newBuffer); - // Don't touch the buffer - it's not in cache yet. cache() will set the initial priority. - if (!wb.lock(false)) { - throw new AssertionError("Cannot lock a new buffer"); - } - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Locked " + wb + " after creation"); - } - boolean hasWaited = false; - WeakBuffer evicted = null; - while (true) { - evicted = cachePolicy.cache(wb); - if (evicted != CachePolicy.CANNOT_EVICT) break; - if (DebugUtils.isTraceCachingEnabled() && !hasWaited) { - LlapIoImpl.LOG.info("Failed to add a new block to cache; waiting for blocks to be unlocked"); - hasWaited = true; - } - synchronized (evictionNotifyObj) { - ++evictionIsWaiting; - evictionNotifyObj.wait(1000); - --evictionIsWaiting; - } - } - if (DebugUtils.isTraceCachingEnabled() && hasWaited) { - LlapIoImpl.LOG.info("Eviction is done waiting"); - } - if (evicted != null) { - //if (evictionListener != null) { - // evictionListener.evictionNotice(evicted); - //} - // After eviction notice, the contents can be reset. - evicted.clear(); - } - return wb; - } - - private final void unblockEviction() { - if (evictionIsWaiting <= 0) return; - synchronized (evictionNotifyObj) { - if (evictionIsWaiting <= 0) return; - if (DebugUtils.isTraceCachingEnabled()) { - LlapIoImpl.LOG.info("Notifying eviction that some block has been unlocked"); - } - evictionNotifyObj.notifyAll(); - } - } - - @VisibleForTesting - public static WeakBuffer allocateFake() { - return new WeakBuffer(null, ByteBuffer.wrap(new byte[1])); - } - - /** - * This class serves 3 purposes: - * 1) it implements BufferPool-specific hashCode and equals (ByteBuffer ones are content-based); - * 2) it contains the refCount; - * 3) by extension from (2), it can be held while it is evicted; when locking before the usage, - * the fact that the data has been evicted will be discovered (similar to weak_ptr). - * Note: not static because when we wait for something to become evict-able, - * we need to receive notifications from unlock (see unlock). Otherwise could be static. - */ - public static final class WeakBuffer { - private static final int EVICTED_REFCOUNT = -1; - private final BufferPool parent; - private ByteBuffer contents; - private final AtomicInteger refCount = new AtomicInteger(0); - - // TODO: Fields pertaining to cache policy. Perhaps they should live in separate object. - public double priority; - public long lastUpdate = -1; - public int indexInHeap = -1; - public boolean isLockedInHeap = false; - - private WeakBuffer(BufferPool parent, ByteBuffer contents) { - this.parent = parent; - this.contents = contents; - } - - public ByteBuffer getContents() { - assert isLocked() : "Cannot get contents with refCount " + refCount.get(); - return contents; - } - - @Override - public int hashCode() { - if (contents == null) return 0; - return System.identityHashCode(contents); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) return true; - if (!(obj instanceof WeakBuffer)) return false; - // We only compare objects, and not contents of the ByteBuffer. - // One ByteBuffer is never put in multiple WeakBuffer-s (that is the invariant). - return contents == ((WeakBuffer)obj).contents; - } - - public boolean lock(boolean doTouch) { - int oldRefCount = -1; - while (true) { - oldRefCount = refCount.get(); - if (oldRefCount == EVICTED_REFCOUNT) return false; - assert oldRefCount >= 0; - if (refCount.compareAndSet(oldRefCount, oldRefCount + 1)) break; - } - if (doTouch && oldRefCount == 0 && parent != null) { - parent.cachePolicy.notifyLock(this); - } - return true; - } - - public boolean isLocked() { - // Best-effort check. We cannot do a good check against caller thread, since - // refCount could still be > 0 if someone else locked. This is used for asserts. - return refCount.get() > 0; - } - - public boolean isInvalid() { - return refCount.get() == EVICTED_REFCOUNT; - } - - public boolean isCleared() { - return contents == null; - } - - public void unlock() { - int newRefCount = refCount.decrementAndGet(); - if (newRefCount < 0) { - throw new AssertionError("Unexpected refCount " + newRefCount); - } - // If this block became eligible, see if we need to unblock the eviction. - if (newRefCount == 0 && parent != null) { - parent.cachePolicy.notifyUnlock(this); - parent.unblockEviction(); - } - } - - @Override - public String toString() { - return "0x" + Integer.toHexString(hashCode()); - } - - /** - * @return Whether the we can invalidate; false if locked or already evicted. - */ - boolean invalidate() { - while (true) { - int value = refCount.get(); - if (value != 0) return false; - if (refCount.compareAndSet(value, EVICTED_REFCOUNT)) break; - } - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Invalidated " + this + " due to eviction"); - } - return true; - } - - ByteBuffer clear() { - assert refCount.get() == EVICTED_REFCOUNT; - ByteBuffer result = contents; - contents = null; - return result; - } - - public String toStringForCache() { - return "[" + Integer.toHexString(hashCode()) + " " + String.format("%1$.2f", priority) + " " - + lastUpdate + " " + (isLocked() ? "!" : ".") + "]"; - } - } -} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/old/CachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/old/CachePolicy.java deleted file mode 100644 index cca42fe..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/old/CachePolicy.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.old; - -import org.apache.hadoop.hive.llap.old.BufferPool.WeakBuffer; - -public interface CachePolicy { - public static final WeakBuffer CANNOT_EVICT = BufferPool.allocateFake(); - - /** - * @param buffer Buffer to cache. - * @return Evicted buffer. All buffers are of the same size currently, so it is one or none. - * It can also be CANNOT_EVICT fake buffer, if we cannot evict and thus cache. - */ - WeakBuffer cache(WeakBuffer buffer); - void notifyLock(WeakBuffer buffer); - void notifyUnlock(WeakBuffer buffer); -} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/old/ChunkPool.java b/llap-server/src/java/org/apache/hadoop/hive/llap/old/ChunkPool.java deleted file mode 100644 index 4f9f165..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/old/ChunkPool.java +++ /dev/null @@ -1,237 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.old; - -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.hadoop.hive.llap.DebugUtils; -import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; -import org.apache.hadoop.hive.llap.old.BufferPool.WeakBuffer; -import org.apache.hadoop.hive.llap.old.ChunkPool.Chunk; - -/** - * This class contains the mapping of file chunks to buffers inside BufferPool. - */ -public class ChunkPool /*implements EvictionListener*/ { - private final ConcurrentHashMap chunkCache = new ConcurrentHashMap(); - - /** Number of unprocessed evictions, for the background thread. */ - private final AtomicInteger newEvictions = new AtomicInteger(0); - private final Thread cleanupThread; - - public ChunkPool() { - cleanupThread = new CleanupThread(); - cleanupThread.start(); - } - - /** - * Gets a chunk from cache - * TODO: We expect that in most cases, some related chunks (e.g. columns for a stripe) - * will be stored in the same buffer. We could use this to get keys more efficiently. - * On the other hand, real stripes are pretty big. - * @param key key to search for. - * @return Chunk corresponding to k. - */ - public Chunk getChunk(K key, HashSet lockedBuffers) { - while (true) { - Chunk result = chunkCache.get(key); - if (result == null) return null; - if (lockChunk(result, lockedBuffers)) return result; - if (chunkCache.remove(key, result)) return null; - } - } - - private boolean lockChunk(Chunk result, HashSet lockedBuffers) { - // We expect the chain to have 1 or 2 buffers (2 if we are on buffer boundary). Keep track of - // what we lock in the bitmask; may need fixing (extremely unlikely - 64+ buffer, giant chunks) - boolean failedToLock = false; - long blocksToUnlock = 0; - long bit = 1 << 63; // The bit indicating that current chunk was locked. - - Chunk chunk = result; - while (chunk != null) { - if (lockedBuffers.contains(chunk.buffer)) { - assert chunk.buffer.isLocked() : chunk.buffer + " is in lockedBuffers but is not locked"; - } else if (chunk.buffer.lock(true)) { - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Locked " + chunk.buffer + " for " + result); - } - lockedBuffers.add(chunk.buffer); - blocksToUnlock += bit; - } else { - failedToLock = true; - break; - } - bit >>>= 1; - chunk = chunk.nextChunk; - if (bit == 1 && chunk != null) { - throw new AssertionError("Chunk chain was too long"); - } - } - if (!failedToLock) return true; - - bit = 1 << 63; - Chunk chunk2 = result; - while (chunk2 != chunk) { - if ((blocksToUnlock & bit) == bit) { - if (DebugUtils.isTraceLockingEnabled()) { - LlapIoImpl.LOG.info("Unlocking " + chunk2.buffer + " due to failed chunk lock"); - } - lockedBuffers.remove(chunk2.buffer); - chunk2.buffer.unlock(); - } - bit >>>= 1; - chunk2 = chunk2.nextChunk; - } - return false; - } - - private boolean verifyChunk(Chunk entry) { - Chunk chunk = entry; - while (chunk != null) { - if (!chunk.buffer.lock(false)) break; - chunk = chunk.nextChunk; - } - Chunk chunk2 = entry; - while (chunk2 != chunk) { - chunk2.buffer.unlock(); - chunk2 = chunk2.nextChunk; - } - return chunk == null; - } - - public Chunk addOrGetChunk(K key, Chunk val, HashSet lockedBuffers) { - assert val.buffer.isLocked(); - while (true) { - Chunk oldVal = chunkCache.putIfAbsent(key, val); - if (oldVal == null) return val; - if (DebugUtils.isTraceCachingEnabled()) { - LlapIoImpl.LOG.info("Trying to cache when the chunk is already cached for " - + key + "; old " + oldVal + ", new " + val); - } - if (lockChunk(oldVal, lockedBuffers)) return oldVal; - // We found some old value but couldn't lock it; remove it. - chunkCache.remove(key, oldVal); - } - } - - //@Override - public void evictionNotice(WeakBuffer evicted) { - int oldValue = newEvictions.getAndIncrement(); - if (oldValue == 0) { - synchronized (newEvictions) { - newEvictions.notifyAll(); - } - } - } - - public static class Chunk { - public WeakBuffer buffer; - public int offset, length; - public Chunk nextChunk; - - public Chunk(WeakBuffer buffer, int offset, int length) { - this.buffer = buffer; - this.offset = offset; - this.length = length; - } - - public Chunk addChunk(Chunk another) { - // Traversing list is bad; however, we expect that this will very rarely happen; and in - // nearly all the cases when it does (buffer boundary) the list will have 1 element. - Chunk chunk = this; - while (chunk.nextChunk != null) { - chunk = chunk.nextChunk; - } - chunk.nextChunk = another; - return this; - } - - @Override - public String toString() { - return "{" + buffer + ", " + offset + ", " + length + "}"; - } - - public String toFullString() { - String result = ""; - Chunk chunk = this; - while (chunk != null) { - result += chunk.toString() + ", "; - chunk = chunk.nextChunk; - } - return result; - } - } - - private final class CleanupThread extends Thread { - private int APPROX_CLEANUP_INTERVAL_SEC = 600; - - public CleanupThread() { - super("Llap ChunkPool cleanup thread"); - setDaemon(true); - setPriority(1); - } - - @Override - public void run() { - while (true) { - try { - doOneCleanupRound(); - } catch (InterruptedException ex) { - LlapIoImpl.LOG.warn("Cleanup thread has been interrupted"); - Thread.currentThread().interrupt(); - break; - } catch (Throwable t) { - LlapIoImpl.LOG.error("Cleanup has failed; the thread will now exit", t); - break; - } - } - } - - private void doOneCleanupRound() throws InterruptedException { - while (true) { - int evictionsSinceLast = newEvictions.getAndSet(0); - if (evictionsSinceLast > 0) break; - synchronized (newEvictions) { - newEvictions.wait(10000); - } - } - // Duration is an estimate; if the size of the map changes rapidly, it can be very different. - long endTime = System.nanoTime() + APPROX_CLEANUP_INTERVAL_SEC * 1000000000L; - int processed = 0; - // TODO: if this iterator affects the map in some bad way, - // we'd need to sleep once per round instead. - Iterator> iter = chunkCache.entrySet().iterator(); - while (iter.hasNext()) { - if (!verifyChunk(iter.next().getValue())) { - iter.remove(); - } - ++processed; - int approxElementsLeft = chunkCache.size() - processed; - Thread.sleep((approxElementsLeft <= 0) - ? 1 : (endTime - System.nanoTime()) / (1000000L * approxElementsLeft)); - } - } - } -} diff --git a/llap-server/src/main/resources/llap-daemon-log4j2.properties b/llap-server/src/main/resources/llap-daemon-log4j2.properties index 5051ca5..268eb59 100644 --- a/llap-server/src/main/resources/llap-daemon-log4j2.properties +++ b/llap-server/src/main/resources/llap-daemon-log4j2.properties @@ -64,7 +64,19 @@ appender.HISTORYAPPENDER.strategy.type = DefaultRolloverStrategy appender.HISTORYAPPENDER.strategy.max = ${sys:llap.daemon.log.maxbackupindex} # list of all loggers -loggers = NIOServerCnxn, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, HistoryLogger +loggers = NIOServerCnxn, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, HistoryLogger, LlapIoImpl, LlapIoOrc, LlapIoCache, LlapIoLocking + +logger.LlapIoImpl.name = LlapIoImpl +logger.LlapIoImpl.level = INFO + +logger.LlapIoOrc.name = LlapIoOrc +logger.LlapIoOrc.level = WARN + +logger.LlapIoCache.name = LlapIoCache +logger.LlapIOCache.level = WARN + +logger.LlapIoLocking.name = LlapIoLocking +logger.LlapIoLocking.level = WARN logger.NIOServerCnxn.name = org.apache.zookeeper.server.NIOServerCnxn logger.NIOServerCnxn.level = WARN diff --git a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java index ea626d7..3d81e43 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java @@ -23,49 +23,6 @@ * trace messages with low runtime cost, in order to investigate reproducible bugs. */ public class DebugUtils { - - public static boolean isTraceEnabled() { - return false; - } - - public static boolean isTraceOrcEnabled() { - return false; - } - - public static boolean isTraceLockingEnabled() { - return false; - } - - public static boolean isTraceMttEnabled() { - return false; - } - - public static boolean isTraceCachingEnabled() { - return false; - } - - public static String toString(long[] a, int offset, int len) { - StringBuilder b = new StringBuilder(); - b.append('['); - for (int i = offset; i < offset + len; ++i) { - b.append(a[i]); - b.append(", "); - } - b.append(']'); - return b.toString(); - } - - public static String toString(byte[] a, int offset, int len) { - StringBuilder b = new StringBuilder(); - b.append('['); - for (int i = offset; i < offset + len; ++i) { - b.append(a[i]); - b.append(", "); - } - b.append(']'); - return b.toString(); - } - public static String toString(boolean[] a) { StringBuilder b = new StringBuilder(); b.append('['); diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LogLevels.java b/ql/src/java/org/apache/hadoop/hive/llap/LogLevels.java deleted file mode 100644 index 300230f..0000000 --- a/ql/src/java/org/apache/hadoop/hive/llap/LogLevels.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional debugrmation - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap; - -import org.slf4j.Logger; - -public class LogLevels { - private final boolean isT, isD, isI, isW, isE; - - public LogLevels(Logger log) { - isT = log.isTraceEnabled(); - isD = log.isDebugEnabled(); - isI = log.isInfoEnabled(); - isW = log.isWarnEnabled(); - isE = log.isErrorEnabled(); - } - - public boolean isTraceEnabled() { - return isT; - } - - public boolean isDebugEnabled() { - return isD; - } - - public boolean isInfoEnabled() { - return isI; - } - - public boolean isWarnEnabled() { - return isW; - } - - public boolean isErrorEnabled() { - return isE; - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index b8490df..31f5c72 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -815,7 +815,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { aggregationBatchInfo = new VectorAggregationBufferBatch(); aggregationBatchInfo.compileAggregationBatchInfo(aggregators); } - LOG.warn("VectorGroupByOperator is vector output " + isVectorOutput); + LOG.info("VectorGroupByOperator is vector output {}", isVectorOutput); outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector( outputFieldNames, objectInspectors); if (isVectorOutput) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java index 96af96a..4d09dcd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java @@ -54,5 +54,5 @@ void readEncodedColumns(int stripeIx, StripeInformation stripe, * checks are entirely eliminated because this method is called with constant value, similar * to just checking the constant in the first place. */ - void setDebugTracing(boolean isEnabled); + void setTracing(boolean isEnabled); } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 29b51ec..a45b674 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -42,7 +42,6 @@ import org.apache.orc.impl.StreamName; import org.apache.orc.StripeInformation; import org.apache.orc.impl.BufferChunk; -import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.ByteBufferAllocatorPool; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory; import org.apache.orc.OrcProto; @@ -103,8 +102,7 @@ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end private final List types; private final long rowIndexStride; private final DataCache cache; - private ByteBufferAllocatorPool pool; - private boolean isDebugTracingEnabled; + private boolean isTracingEnabled; public EncodedReaderImpl(Object fileKey, List types, CompressionCodec codec, int bufferSize, long strideRate, DataCache cache, DataReader dataReader, PoolFactory pf) @@ -209,7 +207,7 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, long offset = 0; // Stream offset in relation to the stripe. // 1.1. Figure out which columns have a present stream boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types); - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("The following columns have PRESENT streams: " + arrayToString(hasNull)); } @@ -230,7 +228,7 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, // We have a stream for included column, but in future it might have no data streams. // It's more like "has at least one column included that has an index stream". hasIndexOnlyCols = hasIndexOnlyCols | included[colIx]; - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Skipping stream: " + streamKind + " at " + offset + ", " + length); } offset += length; @@ -244,7 +242,7 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, includedRgs = colRgs[colRgIx]; ctx = colCtxs[colRgIx] = new ColumnReadContext( colIx, encodings.get(colIx), indexes[colIx]); - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Creating context " + colRgIx + " for column " + colIx + ":" + ctx.toString()); } } else { @@ -254,13 +252,13 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, int indexIx = RecordReaderUtils.getIndexPosition(ctx.encoding.getKind(), types.get(colIx).getKind(), streamKind, isCompressed, hasNull[colIx]); ctx.addStream(offset, stream, indexIx); - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Adding stream for column " + colIx + ": " + streamKind + " at " + offset + ", " + length + ", index position " + indexIx); } if (includedRgs == null || RecordReaderUtils.isDictionary(streamKind, encodings.get(colIx))) { RecordReaderUtils.addEntireStreamToRanges(offset, length, listToRead, true); - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Will read whole stream " + streamKind + "; added to " + listToRead.getTail()); } } else { @@ -287,14 +285,14 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, // 2. Now, read all of the ranges from cache or disk. DiskRangeList.MutateHelper toRead = new DiskRangeList.MutateHelper(listToRead.get()); - if (isDebugTracingEnabled && LOG.isInfoEnabled()) { + if (isTracingEnabled && LOG.isInfoEnabled()) { LOG.info("Resulting disk ranges to read (file " + fileKey + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next)); } BooleanRef isAllInCache = new BooleanRef(); if (hasFileId) { cache.getFileData(fileKey, toRead.next, stripeOffset, CC_FACTORY, isAllInCache); - if (isDebugTracingEnabled && LOG.isInfoEnabled()) { + if (isTracingEnabled && LOG.isInfoEnabled()) { LOG.info("Disk ranges after cache (file " + fileKey + ", base offset " + stripeOffset + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next)); } @@ -322,7 +320,7 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, } } } - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Disk ranges after pre-read (file " + fileKey + ", base offset " + stripeOffset + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next)); } @@ -354,7 +352,7 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, ColumnStreamData cb = null; if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding)) { // This stream is for entire stripe and needed for every RG; uncompress once and reuse. - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for" + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length); } @@ -411,7 +409,7 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, } } - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Disk ranges after preparing all the data " + RecordReaderUtils.stringifyDiskRanges(toRead.next)); } @@ -437,7 +435,7 @@ private ColumnStreamData createRgColumnStreamData(int rgIx, boolean isLastRg, int colIx, StreamContext sctx, long cOffset, long endCOffset, boolean isCompressed) { ColumnStreamData cb = POOLS.csdPool.take(); cb.incRef(); - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Getting data for column "+ colIx + " " + (isLastRg ? "last " : "") + "RG " + rgIx + " stream " + sctx.kind + " at " + sctx.offset + ", " + sctx.length + " index position " + sctx.streamIndexOffset + ": " + @@ -460,17 +458,14 @@ private void releaseInitialRefcounts(DiskRangeList current) { } @Override - public void setDebugTracing(boolean isEnabled) { - this.isDebugTracingEnabled = isEnabled; + public void setTracing(boolean isEnabled) { + this.isTracingEnabled = isEnabled; } @Override public void close() throws IOException { dataReader.close(); - if (pool != null) { - pool.clear(); - } } /** @@ -608,7 +603,7 @@ public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, lon // want to be, or just before. However, RGs can overlap due to encoding, so we may have // to return to a previous block. DiskRangeList current = findExactPosition(start, cOffset); - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Starting read for [" + cOffset + "," + endCOffset + ") at " + current); } @@ -648,7 +643,7 @@ public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, lon } chunk.originalData = null; - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Locking " + chunk.getBuffer() + " due to reuse (after decompression)"); } cache.reuseBuffer(chunk.getBuffer()); @@ -691,13 +686,13 @@ private CacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset, if (current instanceof CacheChunk) { // 2a. This is a decoded compression buffer, add as is. CacheChunk cc = (CacheChunk)current; - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Locking " + cc.getBuffer() + " due to reuse"); } cache.reuseBuffer(cc.getBuffer()); columnStreamData.getCacheBuffers().add(cc.getBuffer()); currentOffset = cc.getEnd(); - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Adding an already-uncompressed buffer " + cc.getBuffer()); } ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, cc); @@ -705,7 +700,7 @@ private CacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset, next = current.next; } else if (current instanceof IncompleteCb) { // 2b. This is a known incomplete CB caused by ORC CB end boundaries being estimates. - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Cannot read " + current); } next = null; @@ -739,7 +734,7 @@ private CacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffse DiskRangeList next = null; assert current instanceof CacheChunk; lastUncompressed = (CacheChunk)current; - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Locking " + lastUncompressed.getBuffer() + " due to reuse"); } cache.reuseBuffer(lastUncompressed.getBuffer()); @@ -749,7 +744,7 @@ private CacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffse } columnStreamData.getCacheBuffers().add(lastUncompressed.getBuffer()); currentOffset = lastUncompressed.getEnd(); - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Adding an uncompressed buffer " + lastUncompressed.getBuffer()); } ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, lastUncompressed); @@ -770,7 +765,6 @@ private CacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffse * to handle just for this case. * We could avoid copy in non-zcr case and manage the buffer that was not allocated by our * allocator. Uncompressed case is not mainline though so let's not complicate it. - * @param qfCounters */ private DiskRangeList preReadUncompressedStream(long baseOffset, DiskRangeList start, long streamOffset, long streamEnd) throws IOException { @@ -780,7 +774,7 @@ private DiskRangeList preReadUncompressedStream(long baseOffset, // 1. Find our bearings in the stream. DiskRangeList current = findIntersectingPosition(start, streamOffset, streamEnd); - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Starting pre-read for [" + streamOffset + "," + streamEnd + ") at " + current); } @@ -836,7 +830,7 @@ private DiskRangeList preReadUncompressedStream(long baseOffset, current = current.split(partEnd); wasSplit = true; } - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Processing uncompressed file data at [" + current.getOffset() + ", " + current.getEnd() + ")"); } @@ -1058,7 +1052,7 @@ private void ponderReleaseInitialRefcount( private void releaseInitialRefcount(CacheChunk cc, boolean isBacktracking) { // This is the last RG for which this buffer will be used. Remove the initial refcount - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Unlocking " + cc.getBuffer() + " for the fetching thread" + (isBacktracking ? "; backtracking" : "")); } @@ -1081,7 +1075,7 @@ private void processCacheCollisions(long[] collisionMask, // Cache has found an old buffer for the key and put it into array instead of our new one. CacheChunk replacedChunk = toDecompress.get(i); MemoryBuffer replacementBuffer = targetBuffers[i]; - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Discarding data due to cache collision: " + replacedChunk.getBuffer() + " replaced with " + replacementBuffer); } @@ -1133,7 +1127,6 @@ public String toString() { * multiple ranges (usually, that would only happen with zcr). * Adds stuff to cachedBuffers, toDecompress and toRelease (see below what each does). * @param current BufferChunk where compression block starts. - * @param ranges Iterator of all chunks, pointing at current. * @param cacheBuffers The result buffer array to add pre-allocated target cache buffer. * @param toDecompress The list of work to decompress - pairs of compressed buffers and the * target buffers (same as the ones added to cacheBuffers). @@ -1157,7 +1150,7 @@ private ProcCacheChunk addOneCompressionBuffer(BufferChunk current, int consumedLength = chunkLength + OutStream.HEADER_SIZE; long cbEndOffset = cbStartOffset + consumedLength; boolean isUncompressed = ((b0 & 0x01) == 1); - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Found CB at " + cbStartOffset + ", chunk length " + chunkLength + ", total " + consumedLength + ", " + (isUncompressed ? "not " : "") + "compressed"); } @@ -1183,7 +1176,7 @@ private ProcCacheChunk addOneCompressionBuffer(BufferChunk current, int remaining = chunkLength - compressed.remaining(); int originalPos = compressed.position(); copy.put(compressed); - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Removing partial CB " + current + " from ranges after copying its contents"); } DiskRangeList next = current.next; @@ -1223,7 +1216,7 @@ private ProcCacheChunk addOneCompressionBuffer(BufferChunk current, DiskRangeList tmp = next; next = next.hasContiguousNext() ? next.next : null; if (next != null) { - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Removing partial CB " + tmp + " from ranges after copying its contents"); } tmp.removeSelf(); @@ -1237,7 +1230,7 @@ private ProcCacheChunk addOneCompressionBuffer(BufferChunk current, private void addIncompleteCompressionBuffer( long cbStartOffset, DiskRangeList target, int extraChunkCount) { IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd()); - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Replacing " + target + " (and " + extraChunkCount + " previous chunks) with " + icb + " in the buffers"); } @@ -1250,9 +1243,7 @@ private void addIncompleteCompressionBuffer( * @param isUncompressed Whether the data in the block is uncompressed. * @param cbStartOffset Compressed start offset of the fCB. * @param cbEndOffset Compressed end offset of the fCB. - * @param lastRange The buffer from which the last (or all) bytes of fCB come. * @param lastChunkLength The number of compressed bytes consumed from last *chunk* into fullCompressionBlock. - * @param ranges The iterator of all compressed ranges for the stream, pointing at lastRange. * @param lastChunk * @param toDecompress See addOneCompressionBuffer. * @param cacheBuffers See addOneCompressionBuffer. @@ -1271,19 +1262,19 @@ private ProcCacheChunk addOneCompressionBlockByteBuffer(ByteBuffer fullCompressi fullCompressionBlock, futureAlloc, cacheBuffers.size() - 1); toDecompress.add(cc); // Adjust the compression block position. - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Adjusting " + lastChunk + " to consume " + lastChunkLength + " compressed bytes"); } lastChunk.getChunk().position(lastChunk.getChunk().position() + lastChunkLength); // Finally, put it in the ranges list for future use (if shared between RGs). // Before anyone else accesses it, it would have been allocated and decompressed locally. if (lastChunk.getChunk().remaining() <= 0) { - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Replacing " + lastChunk + " with " + cc + " in the buffers"); } lastChunk.replaceSelfWith(cc); } else { - if (isDebugTracingEnabled) { + if (isTracingEnabled) { LOG.info("Adding " + cc + " before " + lastChunk + " in the buffers"); } lastChunk.insertPartBefore(cc);