diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 12f4822e38..7d3bebcffc 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -357,9 +357,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal llapDaemonVarsSetLocal.add(ConfVars.LLAP_ALLOCATOR_ARENA_COUNT.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_ALLOCATOR_DIRECT.varname); - llapDaemonVarsSetLocal.add(ConfVars.LLAP_USE_LRFU.varname); + llapDaemonVarsSetLocal.add(ConfVars.LLAP_IO_CACHE_STRATEGY.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_LRFU_LAMBDA.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname); + llapDaemonVarsSetLocal.add(ConfVars.LLAP_IO_MAX_CLOCK_ROTATION.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_IO_USE_FILEID_PATH.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_IO_DECODING_METRICS_PERCENTILE_INTERVALS.varname); @@ -4187,6 +4188,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "partitions and tables) for reporting."), LLAP_USE_LRFU("hive.llap.io.use.lrfu", true, "Whether ORC low-level cache should use LRFU cache policy instead of default (FIFO)."), + LLAP_IO_CACHE_STRATEGY("hive.llap.io.replacement.strategy", "lrfu", new StringSet("fifo", "clock", "lrfu"), + "Cache replacement strategy used by low-level (default to LRFU)."), LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.1f, "Lambda for ORC low-level cache LRFU cache policy. Must be in [0, 1]. 0 makes LRFU\n" + "behave like LFU, 1 makes it behave like LRU, values in between balance accordingly.\n" + @@ -4196,6 +4199,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal LLAP_LRFU_BP_WRAPPER_SIZE("hive.llap.io.lrfu.bp.wrapper.size", 64, "thread local queue " + "used to amortize the lock contention, the idea hear is to try locking as soon we reach max size / 2 " + "and block when max queue size reached"), + LLAP_IO_MAX_CLOCK_ROTATION("hive.llap.io.clock.max.rotation", + 5, + "Maximum number of clock rotation before giving up on clock operations like search eviction victim"), LLAP_CACHE_ALLOW_SYNTHETIC_FILEID("hive.llap.cache.allow.synthetic.fileid", true, "Whether LLAP cache should use synthetic file ID if real one is not available. Systems\n" + "like HDFS, Isilon, etc. provide a unique file/inode ID. On other FSes (e.g. local\n" + diff --git llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java index e5c4a00f3b..d5aa6bb0a3 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java @@ -22,16 +22,39 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputFormat; +import javax.annotation.Nullable; + public interface LlapIo { - InputFormat getInputFormat( - InputFormat sourceInputFormat, Deserializer serde); + /** + * @param sourceInputFormat source input format to be wrapped if possible + * @param serde source serializer and deserializer + * @return InputFormat wrapped by LLAP-IO caching and elevator primitive if possible, {@code null} otherwise. + */ + @Nullable + InputFormat getInputFormat(InputFormat sourceInputFormat, Deserializer serde); + + /** + * closing llap io cache at JVM shutdown. + */ void close(); - String getMemoryInfo(); + /** + * @return String with cache status. + */ + String getMemoryInfo(); /** * purge is best effort and will just release the buffers that are unlocked (refCount == 0). This is typically * called when the system is idle. + * + * @return Total amount of purged bytes. */ long purge(); + + /** + * Used in cases where IO-Elevator mechanics is not usable. + * Used with InputFormat of type {@link org.apache.hadoop.hive.ql.io.LlapCacheOnlyInputFormatInterface}. + * + * @param inputFormat source inputFormat. + */ void initCacheOnlyInputFormat(InputFormat inputFormat); } diff --git llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java index b642c171a4..9f703dd17e 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java @@ -19,9 +19,7 @@ import java.lang.reflect.Constructor; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator; -@SuppressWarnings("rawtypes") public class LlapProxy { private final static String IO_IMPL_CLASS = "org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl"; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java index 341da252c4..e7831bdd1d 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java @@ -1408,8 +1408,7 @@ private int allocateWithSplit(int freeListIx, MemoryBuffer[] dest, return destIx; } - private void initializeNewlyAllocated( - LlapAllocatorBuffer buffer, int allocSize, int headerIx, int offset) { + private void initializeNewlyAllocated(LlapAllocatorBuffer buffer, int allocSize, int headerIx, int offset) { buffer.initialize(data, offset, allocSize); buffer.setNewAllocLocation(arenaIx, headerIx); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java index 733b30c7dd..50b2584346 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.common.io.CacheTag; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; +import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import static java.util.stream.Collectors.joining; @@ -65,7 +66,6 @@ public void run() { long deltaNs = timeNs - v.emptyTimeNs; if (deltaNs < cleanupTimeNs) { nextCleanupInNs = Math.min(nextCleanupInNs, deltaNs); - continue; } else { iter.remove(); } @@ -74,7 +74,8 @@ public void run() { sleepTimeMs = Math.max(MIN_TIME_MS, nextCleanupInNs / 1000000L); } } catch (InterruptedException ex) { - return; // Interrupted. + LlapIoImpl.LOG.warn("Cache content tracker cleanup thread is Interrupted", ex); + Thread.currentThread().interrupt(); } } } @@ -196,6 +197,7 @@ public void debugDumpShort(StringBuilder sb) { } sb.append("\nCache state: \n"); sb.append(endResult.stream().sorted().collect(joining("\n"))); + realPolicy.debugDumpShort(sb); } /** diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/ClockCachePolicy.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/ClockCachePolicy.java new file mode 100644 index 0000000000..5d9ebc5d30 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/ClockCachePolicy.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cache; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import javax.annotation.concurrent.GuardedBy; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Clock eviction policy. Uses a simple circular list to keep a ring of current used buffers. + * New entries are added to tail of the clock hand AKA (clockHand.prev) + * Eviction start at the current clock hand following the next pointer. + * + */ +public class ClockCachePolicy implements LowLevelCachePolicy { + + private static final int DEFAULT_MAX_CIRCLES = 5; + + /** + * Lock to protect the state of the policy, used as mutex when modifying the circular linked list. + */ + private final Lock listLock = new ReentrantLock(); + + /** + * The clock hand shared between threads thus made volatile, to ensure state when read outside of lock. + */ + @GuardedBy("listLock") + private volatile LlapCacheableBuffer clockHand; + + private EvictionListener evictionListener; + /** + * Max number of clock rotation before giving up on clock operation like eviction. + */ + private final int maxCircles; + + public ClockCachePolicy() { + maxCircles = DEFAULT_MAX_CIRCLES; + } + + public ClockCachePolicy(int maxCircles) { + Preconditions.checkState(maxCircles > 0, "Maximum number of clock rotation must be positive and got " + maxCircles); + this.maxCircles = maxCircles; + } + + /** + * Signals to the policy the addition of a new entry to the cache. An entry come with a priority that can be used as + * a hint to replacement policy. + * + * @param buffer buffer to be cached + * @param priority the priority of cached element + */ + @Override public void cache(LlapCacheableBuffer buffer, LowLevelCache.Priority priority) { + listLock.lock(); + try { + //noinspection NonAtomicOperationOnVolatileField + clockHand = appendToCircularList(clockHand, buffer); + } finally { + listLock.unlock(); + } + } + + /** + * Appends new entry to the tail of circular list. + * + * @param head circular list head. + * @param buffer new entry to be added. + * @return the ring head. + */ + private static LlapCacheableBuffer appendToCircularList(LlapCacheableBuffer head, LlapCacheableBuffer buffer) { + if (head == null) { + return linkToItSelf(buffer); + } + buffer.next = head; + buffer.prev = head.prev; + head.prev.next = buffer; + head.prev = buffer; + return head; + } + + /** + * Links the entry to it self to form a ring. + * + * @param buffer input + * @return buffer + */ + private static LlapCacheableBuffer linkToItSelf(LlapCacheableBuffer buffer) { + buffer.prev = buffer; + buffer.next = buffer; + return buffer; + } + + @Override public void notifyLock(LlapCacheableBuffer buffer) { + buffer.setClockBit(); + } + + /** + * Notifies the policy that a buffer is unlocked after been used. This notification signals to the policy that an + * access to this page occurred. + * + * @param buffer buffer that just got unlocked after a read. + */ + @Override public void notifyUnlock(LlapCacheableBuffer buffer) { + + } + + /** + * Signals to the policy that it has to evict some entries from the cache. + * Policy has to at least evict the amount memory requested. + * Not that is method will block until at least {@code memoryToReserve} bytes are evicted. + * + * @param memoryToReserve amount of bytes to be evicted + * @return actual amount of evicted bytes. + */ + @Override public long evictSomeBlocks(long memoryToReserve) { + long evicted = 0; + if (clockHand == null) { + return evicted; + } + int fullClockRotation = 0; + listLock.lock(); + try { + // ring tail is used to mark a clock circle + LlapCacheableBuffer ringTail = clockHand.prev; + // ring head is the current clock position that is under lock. Using local var under lock and updating actual + // clock position as soon we are done with looping + LlapCacheableBuffer currentClockHead = clockHand; + + while (evicted < memoryToReserve && currentClockHead != null && fullClockRotation < maxCircles) { + if (ringTail == currentClockHead) { + fullClockRotation++; + } + if (currentClockHead.isClockBitSet()) { + // case the buffer getting second chance. + currentClockHead.unSetClockBit(); + currentClockHead = currentClockHead.next; + } else { + // try to evict this victim + int invalidateFlag = currentClockHead.invalidate(); + if (invalidateFlag == LlapCacheableBuffer.INVALIDATE_OK + || invalidateFlag == LlapCacheableBuffer.INVALIDATE_ALREADY_INVALID) { + if (invalidateFlag == LlapCacheableBuffer.INVALIDATE_OK) { + // case we are able to evict the buffer notify and account for it. + evictionListener.notifyEvicted(currentClockHead); + evicted += currentClockHead.getMemoryUsage(); + } + LlapCacheableBuffer newHand = currentClockHead.next; + if (newHand == currentClockHead) { + // end of the ring we have looped, nothing else can be done... + currentClockHead = null; + break; + } else { + //remove it from the ring. + if (currentClockHead == ringTail) { + // we are about to remove the current ring tail thus we need to compute new tail + ringTail = ringTail.prev; + } + currentClockHead.prev.next = newHand; + newHand.prev = currentClockHead.prev; + currentClockHead = newHand; + } + } else if (invalidateFlag == LlapCacheableBuffer.INVALIDATE_FAILED) { + // can not be evicted case locked + currentClockHead = currentClockHead.next; + } else { + throw new IllegalStateException("Unknown invalidation flag " + invalidateFlag); + } + } + } + // done with clock rotations, update the current clock hand under lock. + clockHand = currentClockHead; + return evicted; + } finally { + listLock.unlock(); + } + } + + @Override public void setEvictionListener(EvictionListener listener) { + evictionListener = listener; + } + + @Override public long purge() { + return evictSomeBlocks(Long.MAX_VALUE); + } + + @Override public void debugDumpShort(StringBuilder sb) { + if (clockHand == null) { + sb.append("Clock is empty"); + return; + } + listLock.lock(); + try { + sb.append("Clock Status\n"); + LlapCacheableBuffer currentClockHand = clockHand; + LlapCacheableBuffer lastElement = clockHand.prev; + while (currentClockHand != lastElement) { + sb.append(currentClockHand.toStringForCache()); + currentClockHand = currentClockHand.next; + } + sb.append(lastElement.toStringForCache()); + } finally { + listLock.unlock(); + } + } + + @VisibleForTesting protected Iterator getIterator() { + final LlapCacheableBuffer currentHead = clockHand; + if (currentHead == null) { + return new Iterator() { + @Override public boolean hasNext() { + return false; + } + + @Override public LlapCacheableBuffer next() { + throw new NoSuchElementException("empty iterator"); + } + }; + } + final LlapCacheableBuffer tail = clockHand.prev; + return new Iterator() { + LlapCacheableBuffer current = currentHead; + private boolean isLast = false; + + @Override public boolean hasNext() { + return !isLast; + } + + @Override public LlapCacheableBuffer next() { + if (isLast) { + throw new NoSuchElementException("Iterator done"); + } + if (current == tail) { + isLast = true; + } + LlapCacheableBuffer r = current; + current = current.next; + return r; + } + }; + } + + @VisibleForTesting public LlapCacheableBuffer getClockHand() { + return clockHand; + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java index 339335f88d..872dfe891e 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java @@ -80,9 +80,32 @@ int tryIncRef() { return incRefInternal(false); } + /** + * + */ + @Override + public final void setClockBit() { + long val = state.get(); + while (!State.isClockBitSet(val) && !state.compareAndSet(val, State.setClockBit(val))) { + val = state.get(); + } + } + @Override + public final void unSetClockBit() { + long val = state.get(); + while (State.isClockBitSet(val) && !state.compareAndSet(val, State.unSetClockBit(val))) { + val = state.get(); + } + } + + @Override + public final boolean isClockBitSet() { + return State.isClockBitSet(state.get()); + } + static final int INCREF_EVICTED = -1, INCREF_FAILED = -2; private int incRefInternal(boolean doWait) { - long newValue = -1; + long newValue; while (true) { long oldValue = state.get(); if (State.hasFlags(oldValue, State.FLAG_EVICTED)) return INCREF_EVICTED; @@ -337,25 +360,34 @@ public Boolean endDiscard() { return result; } - private static final class State { - public static final int - FLAG_MOVING = 0b00001, // Locked by someone to move or force-evict. - FLAG_EVICTED = 0b00010, // Evicted. This is cache-specific. - FLAG_REMOVED = 0b00100, // Removed from allocator structures. The final state. - FLAG_MEM_RELEASED = 0b01000, // The memory was released to memory manager. - FLAG_NEW_ALLOC = 0b10000; // New allocation before the first use; cannot force-evict. - private static final int FLAGS_WIDTH = 5, - REFCOUNT_WIDTH = 19, ARENA_WIDTH = 16, HEADER_WIDTH = 24; - - public static final long MAX_REFCOUNT = (1 << REFCOUNT_WIDTH) - 1; - - private static final int REFCOUNT_SHIFT = FLAGS_WIDTH, - ARENA_SHIFT = REFCOUNT_SHIFT + REFCOUNT_WIDTH, HEADER_SHIFT = ARENA_SHIFT + ARENA_WIDTH; - - private static final long FLAGS_MASK = (1L << FLAGS_WIDTH) - 1, - REFCOUNT_MASK = ((1L << REFCOUNT_WIDTH) - 1) << REFCOUNT_SHIFT, - ARENA_MASK = ((1L << ARENA_WIDTH) - 1) << ARENA_SHIFT, - HEADER_MASK = ((1L << HEADER_WIDTH) - 1) << HEADER_SHIFT; + /** + * Utility class to manipulate the buffer state. + */ + static final class State { + static final int FLAG_MOVING = 0b00001; // Locked by someone to move or force-evict. + static final int FLAG_EVICTED = 0b00010; // Evicted. This is cache-specific. + static final int FLAG_REMOVED = 0b00100; // Removed from allocator structures. The final state. + static final int FLAG_MEM_RELEASED = 0b01000; // The memory was released to memory manager. + static final int FLAG_NEW_ALLOC = 0b10000; // New allocation before the first use; cannot force-evict. + + static final int FLAGS_WIDTH = 5; + static final int REFCOUNT_WIDTH = 19; + static final int ARENA_WIDTH = 15; + static final int HEADER_WIDTH = 24; + static final int CLOCK_BIT_WIDTH = 1; + + static final long MAX_REFCOUNT = (1 << REFCOUNT_WIDTH) - 1; + + private static final int REF_COUNT_SHIFT = FLAGS_WIDTH; + private static final int ARENA_SHIFT = REF_COUNT_SHIFT + REFCOUNT_WIDTH; + private static final int HEADER_SHIFT = ARENA_SHIFT + ARENA_WIDTH; + private static final int CLOCK_BIT_SHIFT = HEADER_SHIFT + HEADER_WIDTH; + + static final long FLAGS_MASK = (1L << FLAGS_WIDTH) - 1; + static final long REFCOUNT_MASK = ((1L << REFCOUNT_WIDTH) - 1) << REF_COUNT_SHIFT; + static final long ARENA_MASK = ((1L << ARENA_WIDTH) - 1) << ARENA_SHIFT; + static final long HEADER_MASK = ((1L << HEADER_WIDTH) - 1) << HEADER_SHIFT; + static final long CLOCK_BIT_MASK = ((1L << CLOCK_BIT_WIDTH) - 1) << CLOCK_BIT_SHIFT; public static boolean hasFlags(long value, int flags) { return (value & flags) != 0; @@ -364,46 +396,58 @@ public static boolean hasFlags(long value, int flags) { public static int getAllFlags(long state) { return (int) (state & FLAGS_MASK); } - public static final int getRefCount(long state) { - return (int)((state & REFCOUNT_MASK) >>> REFCOUNT_SHIFT); + public static int getRefCount(long state) { + return (int)((state & REFCOUNT_MASK) >>> REF_COUNT_SHIFT); } - public static final int getArena(long state) { - return (int)((state & ARENA_MASK) >>> ARENA_SHIFT); + public static int getArena(long state) { + return (int) ((state & ARENA_MASK) >>> ARENA_SHIFT); } - public static final int getHeader(long state) { + public static int getHeader(long state) { return (int)((state & HEADER_MASK) >>> HEADER_SHIFT); } - public static final long incRefCount(long state) { + public static long incRefCount(long state) { // Note: doesn't check for overflow. Could AND with max refcount mask but the caller checks. - return state + (1 << REFCOUNT_SHIFT); + return state + (1 << REF_COUNT_SHIFT); } - public static final long decRefCount(long state) { + public static long decRefCount(long state) { // Note: doesn't check for overflow. Could AND with max refcount mask but the caller checks. - return state - (1 << REFCOUNT_SHIFT); + return state - (1 << REF_COUNT_SHIFT); + } + + public static long setClockBit(long state) { + return state | CLOCK_BIT_MASK; + } + + public static long unSetClockBit(long state) { + return state & ~CLOCK_BIT_MASK; + } + + public static boolean isClockBitSet(long state) { + return (state & CLOCK_BIT_MASK) == Long.MIN_VALUE; } - public static final long setLocation(long state, int arenaIx, int headerIx) { - long arenaVal = ((long)arenaIx) << ARENA_SHIFT, arenaWMask = arenaVal & ARENA_MASK; - long headerVal = ((long)headerIx) << HEADER_SHIFT, headerWMask = headerVal & HEADER_MASK; + public static long setLocation(long state, int arenaIx, int headerIx) { + long arenaVal = ((long) arenaIx) << ARENA_SHIFT, arenaWMask = arenaVal & ARENA_MASK; + long headerVal = ((long) headerIx) << HEADER_SHIFT, headerWMask = headerVal & HEADER_MASK; assert arenaVal == arenaWMask : "Arena " + arenaIx + " is wider than " + ARENA_WIDTH; assert headerVal == headerWMask : "Header " + headerIx + " is wider than " + HEADER_WIDTH; return (state & ~(ARENA_MASK | HEADER_MASK)) | arenaWMask | headerWMask; } - public static final long setFlag(long state, int flags) { + public static long setFlag(long state, int flags) { assert flags <= FLAGS_MASK; return state | flags; } - public static final long switchFlag(long state, int flags) { + public static long switchFlag(long state, int flags) { assert flags <= FLAGS_MASK; return state ^ flags; } public static String toFlagString(int state) { - return StringUtils.leftPad(Integer.toBinaryString(state), REFCOUNT_SHIFT, '0'); + return StringUtils.leftPad(Integer.toBinaryString(state), REF_COUNT_SHIFT, '0'); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java index 05260332d5..dd819d6964 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java @@ -26,39 +26,77 @@ * using a real programming language. */ public abstract class LlapCacheableBuffer { - protected static final int IN_LIST = -2, NOT_IN_CACHE = -1; - - /** Priority for cache policy (should be pretty universal). */ - public double priority; - /** Last priority update time for cache policy (should be pretty universal). */ - public long lastUpdate = -1; + public static final int INVALIDATE_OK = 0, INVALIDATE_FAILED = 1, INVALIDATE_ALREADY_INVALID = 2; - // TODO: remove some of these fields as needed? + public CacheAttribute cacheAttribute; /** Linked list pointers for LRFU/LRU cache policies. Given that each block is in cache * that might be better than external linked list. Or not, since this is not concurrent. */ public LlapCacheableBuffer prev = null; /** Linked list pointers for LRFU/LRU cache policies. Given that each block is in cache * that might be better than external linked list. Or not, since this is not concurrent. */ public LlapCacheableBuffer next = null; - /** Index in heap for LRFU/LFU cache policies. */ - public int indexInHeap = NOT_IN_CACHE; - public static final int INVALIDATE_OK = 0, INVALIDATE_FAILED = 1, INVALIDATE_ALREADY_INVALID = 2; + /** + * @return result of invalidation. + */ protected abstract int invalidate(); + + /** + * @return size of the buffer in bytes. + */ public abstract long getMemoryUsage(); + + /** + * @param evictionDispatcher dispatcher object to be notified. + */ public abstract void notifyEvicted(EvictionDispatcher evictionDispatcher); + /** + * Set the clock bit to true, should be thread safe + */ + public abstract void setClockBit(); + + /** + * Set the clock bit to false, should be thread safe. + */ + public abstract void unSetClockBit(); + + /** + * @return value of the clock bit. + */ + public abstract boolean isClockBitSet(); + @Override public String toString() { return "0x" + Integer.toHexString(System.identityHashCode(this)); } public String toStringForCache() { - return "[" + Integer.toHexString(hashCode()) + " " + String.format("%1$.2f", priority) + " " - + lastUpdate + " " + (isLocked() ? "!" : ".") + "]"; + return cacheAttribute == null ? + "NONE" : + String.format("[ObjectId %s, Entry Attributes %s, Locked %s]", + Integer.toHexString(hashCode()), + cacheAttribute.toString(), + isLocked()); + } + /** + * @return human readable tag used by the cache content tracker. + */ public abstract CacheTag getTag(); + /** + * @return true if the buffer is locked as part of query execution. + */ protected abstract boolean isLocked(); + + public interface CacheAttribute { + double getPriority(); + void setPriority(double priority); + long getLastUpdate(); + void setTouchTime(long time); + int getIndex(); + void setIndex(int index); + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java index aa5ad66314..ed5a9c5d10 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.llap.cache; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; /** @@ -26,6 +28,32 @@ */ public interface LowLevelCachePolicy extends LlapIoDebugDump { + static LowLevelCachePolicy provideFromConf(Configuration conf) { + final long totalMemorySize = HiveConf.getSizeVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE); + final int minAllocSize = (int) HiveConf.getSizeVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MIN_ALLOC); + String policyName = HiveConf.getVar(conf,HiveConf.ConfVars.LLAP_IO_CACHE_STRATEGY); + //default to fifo. + final LowLevelCachePolicy realCachePolicy; + switch (policyName) { + case "lrfu": + realCachePolicy = new LowLevelLrfuCachePolicy(minAllocSize, totalMemorySize, conf); + break; + case "clock": + realCachePolicy = new ClockCachePolicy(HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_MAX_CLOCK_ROTATION)); + break; + case "fifo": + realCachePolicy = new LowLevelFifoCachePolicy(); + break; + default: + throw new IllegalArgumentException("Unknown cache replacement strategy [" + policyName +"]"); + } + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE)) { + return new CacheContentsTracker(realCachePolicy); + } else { + return realCachePolicy; + } + } + /** * Signals to the policy the addition of a new page to the cache directory. * diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java index 2afb899148..d04f3e2071 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java @@ -43,6 +43,8 @@ * Additionally, buffer locking has to be handled (locked buffer cannot be evicted). */ public final class LowLevelLrfuCachePolicy implements LowLevelCachePolicy { + public static final int IN_LIST = -2; + public static final int NOT_IN_CACHE = -1; private final double lambda; private double f(long x) { return Math.pow(0.5, lambda * x); @@ -116,18 +118,15 @@ public void cache(LlapCacheableBuffer buffer, Priority priority) { // we simply do nothing here. The fact that it was never updated will allow us to add it // properly on the first notifyUnlock. // We'll do is set priority, to account for the inbound one. No lock - not in heap. - assert buffer.lastUpdate == -1; long time = timer.incrementAndGet(); - buffer.priority = F0; - buffer.lastUpdate = time; if (priority == Priority.HIGH) { // This is arbitrary. Note that metadata may come from a big scan and nuke all the data // from some small frequently accessed tables, because it gets such a large priority boost // to start with. Think of the multiplier as the number of accesses after which the data // becomes more important than some random read-once metadata, in a pure-LFU scheme. - buffer.priority *= 3; + buffer.cacheAttribute = new LrfuCacheAttribute(F0 * 3, time); } else { - assert priority == Priority.NORMAL; + buffer.cacheAttribute = new LrfuCacheAttribute(F0, time); } } @@ -138,7 +137,7 @@ public void notifyLock(LlapCacheableBuffer buffer) { // a locked item in either, it will remove it from cache; when we unlock, we are going to // put it back or update it, depending on whether this has happened. This should cause // most of the expensive cache update work to happen in unlock, not blocking processing. - if (buffer.indexInHeap != LlapCacheableBuffer.IN_LIST || !listLock.tryLock()) { + if (buffer.cacheAttribute == null || buffer.cacheAttribute.getIndex() != IN_LIST || !listLock.tryLock()) { return; } @@ -189,17 +188,24 @@ private void doNotifyUnderHeapLock(int count, LlapCacheableBuffer[] cacheableBuf LlapIoImpl.CACHE_LOGGER.trace("Touching {} at {}", buffer, time); } // First, update buffer priority - we have just been using it. - buffer.priority = (buffer.lastUpdate == -1) ? F0 - : touchPriority(time, buffer.lastUpdate, buffer.priority); - buffer.lastUpdate = time; + LrfuCacheAttribute lrfuCacheAttribute = (LrfuCacheAttribute) buffer.cacheAttribute; + if (lrfuCacheAttribute == null) { + //This should not happen but it is possible that adding the attribute on cache call happens after this call + lrfuCacheAttribute = new LrfuCacheAttribute(F0, time); + buffer.cacheAttribute = lrfuCacheAttribute; + } else { + lrfuCacheAttribute.lastUpdate = time; + lrfuCacheAttribute.priority = touchPriority(time, lrfuCacheAttribute.lastUpdate, lrfuCacheAttribute.priority); + } + // Then, if the buffer was in the list, remove it. - if (buffer.indexInHeap == LlapCacheableBuffer.IN_LIST) { + if (lrfuCacheAttribute.indexInHeap == IN_LIST) { listLock.lock(); removeFromListAndUnlock(buffer); } // The only concurrent change that can happen when we hold the heap lock is list removal; // we have just ensured the item is not in the list, so we have a definite state now. - if (buffer.indexInHeap >= 0) { + if (lrfuCacheAttribute.indexInHeap >= 0) { // The buffer has lived in the heap all along. Restore heap property. heapifyDownUnderLock(buffer, time); } else if (heapSize == heap.length) { @@ -207,8 +213,8 @@ private void doNotifyUnderHeapLock(int count, LlapCacheableBuffer[] cacheableBuf LlapCacheableBuffer demoted = heap[0]; listLock.lock(); try { - assert demoted.indexInHeap == 0; // Noone could have moved it, we have the heap lock. - demoted.indexInHeap = LlapCacheableBuffer.IN_LIST; + //assert demoted.indexInHeap == 0; // Noone could have moved it, we have the heap lock. + demoted.cacheAttribute.setIndex(IN_LIST); demoted.prev = null; if (listHead != null) { demoted.next = listHead; @@ -223,12 +229,12 @@ private void doNotifyUnderHeapLock(int count, LlapCacheableBuffer[] cacheableBuf listLock.unlock(); } // Now insert the new buffer in its place and restore heap property. - buffer.indexInHeap = 0; + lrfuCacheAttribute.indexInHeap = 0; heapifyDownUnderLock(buffer, time); } else { // Heap is not full, add the buffer to the heap and restore heap property up. assert heapSize < heap.length : heap.length + " < " + heapSize; - buffer.indexInHeap = heapSize; + lrfuCacheAttribute.indexInHeap = heapSize; heapifyUpUnderLock(buffer, time); ++heapSize; } @@ -250,7 +256,7 @@ public long purge() { oldTail = listTail; while (current != null) { boolean canEvict = LlapCacheableBuffer.INVALIDATE_OK == current.invalidate(); - current.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE; + current.cacheAttribute.setIndex(NOT_IN_CACHE); if (canEvict) { current = current.prev; } else { @@ -276,7 +282,7 @@ public long purge() { heapSize = 0; for (int i = 0; i < oldHeapSize; ++i) { LlapCacheableBuffer result = oldHeap[i]; - result.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE; + result.cacheAttribute.setIndex(NOT_IN_CACHE); int invalidateResult = result.invalidate(); if (invalidateResult != LlapCacheableBuffer.INVALIDATE_OK) { oldHeap[i] = null; // Removed from heap without evicting. @@ -370,7 +376,7 @@ private long evictFromList(long memoryToReserve) { continue; } // Update the state to removed-from-list, so that parallel notifyUnlock doesn't modify us. - nextCandidate.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE; + nextCandidate.cacheAttribute.setIndex(NOT_IN_CACHE); evicted += nextCandidate.getMemoryUsage(); nextCandidate = nextCandidate.prev; } @@ -409,8 +415,8 @@ private LlapCacheableBuffer evictFromHeapUnderLock(long time) { private void heapifyUpUnderLock(LlapCacheableBuffer buffer, long time) { // See heapifyDown comment. - int ix = buffer.indexInHeap; - double priority = buffer.priority; + int ix = buffer.cacheAttribute.getIndex(); + double priority = buffer.cacheAttribute.getPriority(); while (true) { if (ix == 0) { break; // Buffer is at the top of the heap. @@ -422,10 +428,10 @@ private void heapifyUpUnderLock(LlapCacheableBuffer buffer, long time) { break; } heap[ix] = parent; - parent.indexInHeap = ix; + parent.cacheAttribute.setIndex(ix); ix = parentIx; } - buffer.indexInHeap = ix; + buffer.cacheAttribute.setIndex(ix); heap[ix] = buffer; } @@ -434,16 +440,18 @@ private LlapCacheableBuffer evictHeapElementUnderLock(long time, int ix) { if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) { LlapIoImpl.CACHE_LOGGER.trace("Evicting {} at {}", result, time); } - result.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE; + LrfuCacheAttribute lrfuCacheAttribute = (LrfuCacheAttribute) result.cacheAttribute; + lrfuCacheAttribute.indexInHeap = NOT_IN_CACHE; --heapSize; int invalidateResult = result.invalidate(); boolean canEvict = invalidateResult == LlapCacheableBuffer.INVALIDATE_OK; if (heapSize > 0) { LlapCacheableBuffer newRoot = heap[heapSize]; - newRoot.indexInHeap = ix; - if (newRoot.lastUpdate != time) { - newRoot.priority = expirePriority(time, newRoot.lastUpdate, newRoot.priority); - newRoot.lastUpdate = time; + LrfuCacheAttribute newRootCacheAttribute = (LrfuCacheAttribute) newRoot.cacheAttribute; + newRootCacheAttribute.indexInHeap = ix; + if (newRootCacheAttribute.lastUpdate != time) { + newRootCacheAttribute.priority = expirePriority(time, newRootCacheAttribute.lastUpdate, newRootCacheAttribute.priority); + newRootCacheAttribute.lastUpdate = time; } heapifyDownUnderLock(newRoot, time); } @@ -457,8 +465,8 @@ private void heapifyDownUnderLock(LlapCacheableBuffer buffer, long time) { // down; therefore, we can update priorities of other blocks as we go for part of the heap - // we correct any discrepancy w/the parent after expiring priority, and any block we expire // the priority for already has lower priority than that of its children. - int ix = buffer.indexInHeap; - double priority = buffer.priority; + int ix = buffer.cacheAttribute.getIndex(); + double priority = buffer.cacheAttribute.getPriority(); while (true) { int newIx = moveMinChildUp(ix, time, priority); if (newIx == -1) { @@ -466,7 +474,7 @@ private void heapifyDownUnderLock(LlapCacheableBuffer buffer, long time) { } ix = newIx; } - buffer.indexInHeap = ix; + buffer.cacheAttribute.setIndex(ix); heap[ix] = buffer; } @@ -491,29 +499,32 @@ private int moveMinChildUp(int targetPos, long time, double comparePri) { } if (leftPri <= rightPri) { // prefer left, cause right might be missing heap[targetPos] = left; - left.indexInHeap = targetPos; + LrfuCacheAttribute leftCacheAttribute = (LrfuCacheAttribute) left.cacheAttribute; + leftCacheAttribute.indexInHeap = targetPos; return leftIx; } else { heap[targetPos] = right; - right.indexInHeap = targetPos; + LrfuCacheAttribute rightCacheAttribute = (LrfuCacheAttribute) right.cacheAttribute; + rightCacheAttribute.indexInHeap = targetPos; return rightIx; } } private double getHeapifyPriority(LlapCacheableBuffer buf, long time) { - if (buf == null) { + if (buf == null || buf.cacheAttribute == null) { return Double.MAX_VALUE; } - if (buf.lastUpdate != time && time >= 0) { - buf.priority = expirePriority(time, buf.lastUpdate, buf.priority); - buf.lastUpdate = time; + LrfuCacheAttribute lrfuCacheAttribute= (LrfuCacheAttribute) buf.cacheAttribute; + if (lrfuCacheAttribute.lastUpdate != time && time >= 0) { + lrfuCacheAttribute.priority = expirePriority(time, lrfuCacheAttribute.lastUpdate, lrfuCacheAttribute.priority); + lrfuCacheAttribute.lastUpdate = time; } - return buf.priority; + return lrfuCacheAttribute.priority; } private void removeFromListAndUnlock(LlapCacheableBuffer buffer) { try { - if (buffer.indexInHeap != LlapCacheableBuffer.IN_LIST) { + if (buffer.cacheAttribute.getIndex() != IN_LIST) { return; } removeFromListUnderLock(buffer); @@ -523,7 +534,7 @@ private void removeFromListAndUnlock(LlapCacheableBuffer buffer) { } private void removeFromListUnderLock(LlapCacheableBuffer buffer) { - buffer.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE; + buffer.cacheAttribute.setIndex(NOT_IN_CACHE); boolean isTail = buffer == listTail, isHead = buffer == listHead; if ((isTail != (buffer.next == null)) || (isHead != (buffer.prev == null))) { debugDumpListOnError(buffer); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LrfuCacheAttribute.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LrfuCacheAttribute.java new file mode 100644 index 0000000000..486f907bd1 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LrfuCacheAttribute.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cache; + +public final class LrfuCacheAttribute implements LlapCacheableBuffer.CacheAttribute { + /** Priority for cache policy (should be pretty universal). */ + public double priority; + /** Last priority update time for cache policy (should be pretty universal). */ + public long lastUpdate = -1; + /** Index in heap for LRFU/LFU cache policies. */ + public int indexInHeap = LowLevelLrfuCachePolicy.NOT_IN_CACHE; + + public LrfuCacheAttribute(double priority, long lastUpdate) { + this.priority = priority; + this.lastUpdate = lastUpdate; + } + + @Override public String toString() { + return "LrfuCacheAttribute{" + + "priority=" + + priority + + ", lastUpdate=" + + lastUpdate + + ", indexInHeap=" + + indexInHeap + + '}'; + } + + @Override public double getPriority() { + return priority; + } + + @Override public void setPriority(double priority) { + this.priority = priority; + } + + @Override public long getLastUpdate() { + return lastUpdate; + } + + @Override public void setTouchTime(long time) { + lastUpdate = time; + } + + @Override public int getIndex() { + return indexInHeap; + } + + @Override public void setIndex(int index) { + indexInHeap = index; + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/GenericDataCache.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/GenericDataCache.java new file mode 100644 index 0000000000..bfa51df7ba --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/GenericDataCache.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.io.api.impl; + +import org.apache.hadoop.hive.common.io.Allocator; +import org.apache.hadoop.hive.common.io.CacheTag; +import org.apache.hadoop.hive.common.io.DataCache; +import org.apache.hadoop.hive.common.io.DiskRange; +import org.apache.hadoop.hive.common.io.DiskRangeList; +import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; +import org.apache.hadoop.hive.llap.cache.BufferUsageManager; +import org.apache.hadoop.hive.llap.cache.LlapDataBuffer; +import org.apache.hadoop.hive.llap.cache.LowLevelCache; + +/** + * Cache implementation for generic data format like parquet where we can't use full llap io elevator. + */ +class GenericDataCache implements DataCache, Allocator.BufferObjectFactory { + private final LowLevelCache lowLevelCache; + private final BufferUsageManager bufferManager; + + public GenericDataCache(LowLevelCache lowLevelCache, BufferUsageManager bufferManager) { + this.lowLevelCache = lowLevelCache; + this.bufferManager = bufferManager; + } + + @Override + public DiskRangeList getFileData(Object fileKey, DiskRangeList range, + long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) { + // TODO: we currently pass null counters because this doesn't use LlapRecordReader. + // Create counters for non-elevator-using fragments also? + return lowLevelCache.getFileData(fileKey, range, baseOffset, factory, null, gotAllData); + } + + @Override + public long[] putFileData(Object fileKey, DiskRange[] ranges, + MemoryBuffer[] data, long baseOffset) { + return putFileData(fileKey, ranges, data, baseOffset, null); + } + + @Override + public long[] putFileData(Object fileKey, DiskRange[] ranges, + MemoryBuffer[] data, long baseOffset, CacheTag tag) { + return lowLevelCache.putFileData( + fileKey, ranges, data, baseOffset, LowLevelCache.Priority.NORMAL, null, tag); + } + + @Override + public void releaseBuffer(MemoryBuffer buffer) { + bufferManager.decRefBuffer(buffer); + } + + @Override + public void reuseBuffer(MemoryBuffer buffer) { + boolean isReused = bufferManager.incRefBuffer(buffer); + assert isReused; + } + + @Override + public Allocator getAllocator() { + return bufferManager.getAllocator(); + } + + @Override + public Allocator.BufferObjectFactory getDataBufferFactory() { + return this; + } + + @Override + public MemoryBuffer create() { + return new LlapDataBuffer(); + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index fadefa20bc..a5403fd323 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -18,46 +18,36 @@ package org.apache.hadoop.hive.llap.io.api.impl; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.OptionalInt; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import javax.management.ObjectName; -import org.apache.hadoop.hive.common.io.CacheTag; +import com.google.common.base.Preconditions; import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.io.Allocator; -import org.apache.hadoop.hive.common.io.DataCache; -import org.apache.hadoop.hive.common.io.DiskRange; -import org.apache.hadoop.hive.common.io.DiskRangeList; import org.apache.hadoop.hive.common.io.FileMetadataCache; -import org.apache.hadoop.hive.common.io.Allocator.BufferObjectFactory; -import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.cache.BuddyAllocator; import org.apache.hadoop.hive.llap.cache.BufferUsageManager; -import org.apache.hadoop.hive.llap.cache.CacheContentsTracker; import org.apache.hadoop.hive.llap.cache.EvictionDispatcher; -import org.apache.hadoop.hive.llap.cache.LlapDataBuffer; import org.apache.hadoop.hive.llap.cache.LlapIoDebugDump; import org.apache.hadoop.hive.llap.cache.LowLevelCache; import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl; import org.apache.hadoop.hive.llap.cache.LowLevelCacheMemoryManager; import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy; -import org.apache.hadoop.hive.llap.cache.LowLevelFifoCachePolicy; -import org.apache.hadoop.hive.llap.cache.LowLevelLrfuCachePolicy; import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl; import org.apache.hadoop.hive.llap.cache.SimpleAllocator; import org.apache.hadoop.hive.llap.cache.SimpleBufferManager; -import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.io.api.LlapIo; import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer; import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer; @@ -76,13 +66,6 @@ import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hive.common.util.FixedSizedObjectPool; - - - - - - -import com.google.common.primitives.Ints; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class LlapIoImpl implements LlapIo, LlapIoDebugDump { @@ -91,143 +74,148 @@ public static final Logger CACHE_LOGGER = LoggerFactory.getLogger("LlapIoCache"); public static final Logger LOCKING_LOGGER = LoggerFactory.getLogger("LlapIoLocking"); private static final String MODE_CACHE = "cache"; + private static final String DISPLAY_NAME = "LlapDaemonCacheMetrics-" + MetricsUtils.getHostName(); + private static final String IO_DISPLAY_NAME = "LlapDaemonIOMetrics-" + MetricsUtils.getHostName(); // TODO: later, we may have a map private final ColumnVectorProducer orcCvp, genericCvp; private final ExecutorService executor; private final LlapDaemonCacheMetrics cacheMetrics; private final LlapDaemonIOMetrics ioMetrics; - private ObjectName buddyAllocatorMXBean; + private final ObjectName buddyAllocatorMXBean; private final Allocator allocator; private final FileMetadataCache fileMetadataCache; private final LowLevelCache dataCache; private final BufferUsageManager bufferManager; private final Configuration daemonConf; private final LowLevelCacheMemoryManager memoryManager; - - private List debugDumpComponents = new ArrayList<>(); - - private LlapIoImpl(Configuration conf) throws IOException { - this.daemonConf = conf; - String ioMode = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MODE); - boolean useLowLevelCache = LlapIoImpl.MODE_CACHE.equalsIgnoreCase(ioMode); - LOG.info("Initializing LLAP IO in {} mode", useLowLevelCache ? LlapIoImpl.MODE_CACHE : "none"); - String displayName = "LlapDaemonCacheMetrics-" + MetricsUtils.getHostName(); - String sessionId = conf.get("llap.daemon.metrics.sessionid"); - this.cacheMetrics = LlapDaemonCacheMetrics.create(displayName, sessionId); - - displayName = "LlapDaemonIOMetrics-" + MetricsUtils.getHostName(); - String[] strIntervals = HiveConf.getTrimmedStringsVar(conf, - HiveConf.ConfVars.LLAP_IO_DECODING_METRICS_PERCENTILE_INTERVALS); - List intervalList = new ArrayList<>(); - if (strIntervals != null) { - for (String strInterval : strIntervals) { - try { - intervalList.add(Integer.valueOf(strInterval)); - } catch (NumberFormatException e) { - LOG.warn("Ignoring IO decoding metrics interval {} from {} as it is invalid", strInterval, - Arrays.toString(strIntervals)); - } - } + private final List debugDumpComponents = new ArrayList<>(); + + /** + * Llap IO is created via Reflection by {@link org.apache.hadoop.hive.llap.io.api.LlapProxy}. + * + * @param conf Configuration containing all the needed parameters and flags to initialize LLAP IO. + */ + private LlapIoImpl(Configuration conf) { + this.daemonConf = Preconditions.checkNotNull(conf); + final boolean + useLowLevelCache = + LlapIoImpl.MODE_CACHE.equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MODE)); + final boolean isEncodeEnabled = useLowLevelCache && HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENCODE_ENABLED); + LOG.info("Initializing LLAP IO in {} mode, with Encoding = {} ", + useLowLevelCache ? LlapIoImpl.MODE_CACHE : "none", + isEncodeEnabled); + + //setup metrics reporters + final String sessionId = conf.get("llap.daemon.metrics.sessionid"); + final String[] + strIntervals = + HiveConf.getTrimmedStringsVar(conf, HiveConf.ConfVars.LLAP_IO_DECODING_METRICS_PERCENTILE_INTERVALS); + int[] intArray = stringsToIntegers(strIntervals); + if (strIntervals != null && strIntervals.length != intArray.length) { + LOG.warn("Ignoring IO decoding metrics interval from {} as it is invalid due to Number format exception", + Arrays.toString(strIntervals)); } - this.ioMetrics = LlapDaemonIOMetrics.create(displayName, sessionId, Ints.toArray(intervalList)); + this.ioMetrics = LlapDaemonIOMetrics.create(IO_DISPLAY_NAME, sessionId, intArray); + this.cacheMetrics = LlapDaemonCacheMetrics.create(DISPLAY_NAME, sessionId); + LOG.info("Started llap daemon metrics with displayName: {} sessionId: {}", IO_DISPLAY_NAME, sessionId); - LOG.info("Started llap daemon metrics with displayName: {} sessionId: {}", displayName, - sessionId); + int numThreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE); + this.executor = + new StatsRecordingThreadPool(numThreads, + numThreads, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build()); + LOG.info("Created IO Elevator Thread pool with {} Threads ", numThreads); + // IO thread pool. Listening is used for unhandled errors for now (TODO: remove?) + FixedSizedObjectPool tracePool = IoTrace.createTracePool(conf, numThreads); - MetadataCache metadataCache = null; - SerDeLowLevelCacheImpl serdeCache = null; // TODO: extract interface when needed - BufferUsageManager bufferManagerOrc = null, bufferManagerGeneric = null; - boolean isEncodeEnabled = useLowLevelCache - && HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENCODE_ENABLED); + final MetadataCache metadataCache; + final BufferUsageManager bufferManagerOrc, bufferManagerGeneric; + final SerDeLowLevelCacheImpl serdeCache; // TODO: extract interface when needed if (useLowLevelCache) { // Memory manager uses cache policy to trigger evictions, so create the policy first. - boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU); - long totalMemorySize = HiveConf.getSizeVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE); - int minAllocSize = (int) HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC); - LowLevelCachePolicy - realCachePolicy = - useLrfu ? new LowLevelLrfuCachePolicy(minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy(); - boolean trackUsage = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE); - LowLevelCachePolicy cachePolicyWrapper; - if (trackUsage) { - cachePolicyWrapper = new CacheContentsTracker(realCachePolicy); - } else { - cachePolicyWrapper = realCachePolicy; - } + final long totalMemorySize = HiveConf.getSizeVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE); + + final LowLevelCachePolicy cachePolicyWrapper = LowLevelCachePolicy.provideFromConf(conf); + // Allocator uses memory manager to request memory, so create the manager next. - this.memoryManager = new LowLevelCacheMemoryManager( - totalMemorySize, cachePolicyWrapper, cacheMetrics); - cacheMetrics.setCacheCapacityTotal(totalMemorySize); + this.memoryManager = new LowLevelCacheMemoryManager(totalMemorySize, cachePolicyWrapper, cacheMetrics); + this.cacheMetrics.setCacheCapacityTotal(totalMemorySize); // Cache uses allocator to allocate and deallocate, create allocator and then caches. BuddyAllocator allocator = new BuddyAllocator(conf, memoryManager, cacheMetrics); this.allocator = allocator; - LowLevelCacheImpl cacheImpl = new LowLevelCacheImpl( - cacheMetrics, cachePolicyWrapper, allocator, true); - dataCache = cacheImpl; + LowLevelCacheImpl cacheImpl = new LowLevelCacheImpl(cacheMetrics, cachePolicyWrapper, allocator, true); + this.dataCache = cacheImpl; if (isEncodeEnabled) { - SerDeLowLevelCacheImpl serdeCacheImpl = new SerDeLowLevelCacheImpl( - cacheMetrics, cachePolicyWrapper, allocator); - serdeCache = serdeCacheImpl; - serdeCacheImpl.setConf(conf); + serdeCache = new SerDeLowLevelCacheImpl(cacheMetrics, cachePolicyWrapper, allocator); + serdeCache.setConf(conf); + } else { + serdeCache = null; } - - boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE); - metadataCache = new MetadataCache( - allocator, memoryManager, cachePolicyWrapper, useGapCache, cacheMetrics); - fileMetadataCache = metadataCache; + final boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE); + metadataCache = new MetadataCache(allocator, memoryManager, cachePolicyWrapper, useGapCache, cacheMetrics); + this.fileMetadataCache = metadataCache; // And finally cache policy uses cache to notify it of eviction. The cycle is complete! - EvictionDispatcher e = new EvictionDispatcher( - dataCache, serdeCache, metadataCache, allocator); + final EvictionDispatcher e = new EvictionDispatcher(dataCache, serdeCache, metadataCache, allocator); cachePolicyWrapper.setEvictionListener(e); - cacheImpl.startThreads(); // Start the cache threads. bufferManager = bufferManagerOrc = cacheImpl; // Cache also serves as buffer manager. bufferManagerGeneric = serdeCache; - if (trackUsage) { - debugDumpComponents.add(cachePolicyWrapper); // Cache contents tracker. - } - debugDumpComponents.add(realCachePolicy); + debugDumpComponents.add(cachePolicyWrapper); // Cache contents tracker. debugDumpComponents.add(cacheImpl); if (serdeCache != null) { debugDumpComponents.add(serdeCache); } - if (metadataCache != null) { - debugDumpComponents.add(metadataCache); - } + debugDumpComponents.add(metadataCache); debugDumpComponents.add(allocator); } else { this.allocator = new SimpleAllocator(conf); - fileMetadataCache = null; - SimpleBufferManager sbm = new SimpleBufferManager(allocator, cacheMetrics); - bufferManager = bufferManagerOrc = bufferManagerGeneric = sbm; - dataCache = sbm; + final SimpleBufferManager sbm = new SimpleBufferManager(allocator, cacheMetrics); + this.bufferManager = bufferManagerOrc = bufferManagerGeneric = sbm; + this.dataCache = sbm; + this.debugDumpComponents.add(sb -> sb.append("LLAP IO allocator is not in use!")); + this.fileMetadataCache = null; this.memoryManager = null; - debugDumpComponents.add(new LlapIoDebugDump() { - @Override - public void debugDumpShort(StringBuilder sb) { - sb.append("LLAP IO allocator is not in use!"); - } - }); + serdeCache = null; + metadataCache = null; } - // IO thread pool. Listening is used for unhandled errors for now (TODO: remove?) - int numThreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE); - executor = new StatsRecordingThreadPool(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(), - new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build()); - FixedSizedObjectPool tracePool = IoTrace.createTracePool(conf); - // TODO: this should depends on input format and be in a map, or something. - this.orcCvp = new OrcColumnVectorProducer( - metadataCache, dataCache, bufferManagerOrc, conf, cacheMetrics, ioMetrics, tracePool); - this.genericCvp = isEncodeEnabled ? new GenericColumnVectorProducer( - serdeCache, bufferManagerGeneric, conf, cacheMetrics, ioMetrics, tracePool) : null; - LOG.info("LLAP IO initialized"); - registerMXBeans(); + // TODO: this should depends on input format and be in a map, or something. + this.orcCvp = + new OrcColumnVectorProducer(metadataCache, + dataCache, + bufferManagerOrc, + conf, + cacheMetrics, + ioMetrics, + tracePool); + this.genericCvp = + isEncodeEnabled ? + new GenericColumnVectorProducer(serdeCache, + bufferManagerGeneric, + conf, + cacheMetrics, + ioMetrics, + tracePool) : + null; + this.buddyAllocatorMXBean = MBeans.register("LlapDaemon", "BuddyAllocatorInfo", allocator); + LOG.info("LLAP IO successful initialized."); } - private void registerMXBeans() { - buddyAllocatorMXBean = MBeans.register("LlapDaemon", "BuddyAllocatorInfo", allocator); + private static int[] stringsToIntegers(String[] strings) { + if (strings == null) { + return new int[0]; + } + return Arrays.stream(strings).map(x -> { + try { + return OptionalInt.of(Integer.parseInt(x)); + } catch (NumberFormatException e) { + return OptionalInt.empty(); + } + }).filter(OptionalInt::isPresent).mapToInt(OptionalInt::getAsInt).toArray(); } @Override @@ -248,11 +236,9 @@ public long purge() { @Override public InputFormat getInputFormat( InputFormat sourceInputFormat, Deserializer sourceSerDe) { - ColumnVectorProducer cvp = genericCvp; - if (sourceInputFormat instanceof OrcInputFormat) { - cvp = orcCvp; // Special-case for ORC. - } else if (cvp == null) { - LOG.warn("LLAP encode is disabled; cannot use for " + sourceInputFormat.getClass()); + ColumnVectorProducer cvp = sourceInputFormat instanceof OrcInputFormat ? orcCvp : genericCvp; + if (cvp == null) { + LOG.warn("LLAP encode is disabled; cannot use for {} ", sourceInputFormat.getClass()); return null; } return new LlapInputFormat(sourceInputFormat, sourceSerDe, cvp, executor, daemonConf); @@ -260,10 +246,9 @@ public long purge() { @Override public void close() { - LOG.info("Closing LlapIoImpl.."); + LOG.info("Closing LlapIoImpl."); if (buddyAllocatorMXBean != null) { MBeans.unregister(buddyAllocatorMXBean); - buddyAllocatorMXBean = null; } executor.shutdownNow(); } @@ -272,65 +257,7 @@ public void close() { @Override public void initCacheOnlyInputFormat(InputFormat inputFormat) { LlapCacheOnlyInputFormatInterface cacheIf = (LlapCacheOnlyInputFormatInterface)inputFormat; - cacheIf.injectCaches(fileMetadataCache, - new GenericDataCache(dataCache, bufferManager), daemonConf); - } - - private class GenericDataCache implements DataCache, BufferObjectFactory { - private final LowLevelCache lowLevelCache; - private final BufferUsageManager bufferManager; - - public GenericDataCache(LowLevelCache lowLevelCache, BufferUsageManager bufferManager) { - this.lowLevelCache = lowLevelCache; - this.bufferManager = bufferManager; - } - - @Override - public DiskRangeList getFileData(Object fileKey, DiskRangeList range, - long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) { - // TODO: we currently pass null counters because this doesn't use LlapRecordReader. - // Create counters for non-elevator-using fragments also? - return lowLevelCache.getFileData(fileKey, range, baseOffset, factory, null, gotAllData); - } - - @Override - public long[] putFileData(Object fileKey, DiskRange[] ranges, - MemoryBuffer[] data, long baseOffset) { - return putFileData(fileKey, ranges, data, baseOffset, null); - } - - @Override - public long[] putFileData(Object fileKey, DiskRange[] ranges, - MemoryBuffer[] data, long baseOffset, CacheTag tag) { - return lowLevelCache.putFileData( - fileKey, ranges, data, baseOffset, Priority.NORMAL, null, tag); - } - - @Override - public void releaseBuffer(MemoryBuffer buffer) { - bufferManager.decRefBuffer(buffer); - } - - @Override - public void reuseBuffer(MemoryBuffer buffer) { - boolean isReused = bufferManager.incRefBuffer(buffer); - assert isReused; - } - - @Override - public Allocator getAllocator() { - return bufferManager.getAllocator(); - } - - @Override - public BufferObjectFactory getDataBufferFactory() { - return this; - } - - @Override - public MemoryBuffer create() { - return new LlapDataBuffer(); - } + cacheIf.injectCaches(fileMetadataCache, new GenericDataCache(dataCache, bufferManager), daemonConf); } @Override diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java index 10bd736486..e544ef8e86 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java @@ -62,8 +62,7 @@ public MetadataCache(BuddyAllocator allocator, MemoryManager memoryManager, this.allocator = allocator; this.policy = policy; this.metrics = metrics; - this.estimateErrors = useEstimateCache - ? new ConcurrentHashMap() : null; + this.estimateErrors = useEstimateCache ? new ConcurrentHashMap<>() : null; } public void putIncompleteCbs(Object fileKey, DiskRange[] ranges, long baseOffset, AtomicBoolean isStopped) { @@ -235,7 +234,7 @@ private LlapBufferOrBuffers wrapBbForFile(LlapBufferOrBuffers result, if (maxAlloc < length) { largeBuffers = new LlapMetadataBuffer[length / maxAlloc]; for (int i = 0; i < largeBuffers.length; ++i) { - largeBuffers[i] = new LlapMetadataBuffer(fileKey, tag); + largeBuffers[i] = new LlapMetadataBuffer<>(fileKey, tag); } allocator.allocateMultiple(largeBuffers, maxAlloc, null, isStopped); for (int i = 0; i < largeBuffers.length; ++i) { @@ -256,7 +255,7 @@ private LlapBufferOrBuffers wrapBbForFile(LlapBufferOrBuffers result, LlapMetadataBuffer[] cacheData = new LlapMetadataBuffer[largeBuffers.length + 1]; System.arraycopy(largeBuffers, 0, cacheData, 0, largeBuffers.length); cacheData[largeBuffers.length] = smallBuffer[0]; - return new LlapMetadataBuffers(cacheData); + return new LlapMetadataBuffers<>(cacheData); } } } @@ -340,16 +339,16 @@ public void decRefBuffer(MemoryBufferOrBuffers buffer) { if (result != null) return result; if (tailBuffer.remaining() <= allocator.getMaxAllocation()) { // The common case by far. - return wrapSmallBb(new LlapMetadataBuffer(key, tag), tailBuffer, isStopped); + return wrapSmallBb(new LlapMetadataBuffer<>(key, tag), tailBuffer, isStopped); } else { int allocCount = determineAllocCount(tailBuffer); @SuppressWarnings("unchecked") LlapMetadataBuffer[] results = new LlapMetadataBuffer[allocCount]; for (int i = 0; i < allocCount; ++i) { - results[i] = new LlapMetadataBuffer(key, tag); + results[i] = new LlapMetadataBuffer<>(key, tag); } wrapLargeBb(results, tailBuffer, isStopped); - return new LlapMetadataBuffers(results); + return new LlapMetadataBuffers<>(results); } } @@ -406,7 +405,8 @@ private boolean lockBuffer(LlapBufferOrBuffers buffers, boolean doNotifyPolicy) for (int i = 0; i < bufferArray.length; ++i) { if (lockOneBuffer(bufferArray[i], doNotifyPolicy)) continue; for (int j = 0; j < i; ++j) { - unlockSingleBuffer(buffer, true); + //@TODO this is a bug by it self. + unlockSingleBuffer(bufferArray[j], true); } discardMultiBuffer(buffers); return false; @@ -461,8 +461,8 @@ private void unlockBuffer(LlapBufferOrBuffers buffers, boolean isCached) { } private void unlockSingleBuffer(LlapAllocatorBuffer buffer, boolean isCached) { - boolean isLastDecref = (buffer.decRef() == 0); - if (isLastDecref) { + boolean isLastDecRef = (buffer.decRef() == 0); + if (isLastDecRef) { if (isCached) { policy.notifyUnlock(buffer); } else { @@ -497,7 +497,7 @@ public boolean equals(Object obj) { } } - public static interface LlapBufferOrBuffers extends MemoryBufferOrBuffers { + public interface LlapBufferOrBuffers extends MemoryBufferOrBuffers { LlapAllocatorBuffer getSingleLlapBuffer(); LlapAllocatorBuffer[] getMultipleLlapBuffers(); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java index 33e16802df..0d7ab04b57 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java @@ -32,6 +32,11 @@ import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator; import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator.ObjectEstimator; +/** + * Metadata Cache entry that does hold information about Files. + * No actual allocated bytebuffers is used by this class, + * thus handled in a different way form {@link org.apache.hadoop.hive.llap.cache.LlapAllocatorBuffer} + */ public class OrcFileEstimateErrors extends LlapCacheableBuffer { private final Object fileKey; private int estimatedMemUsage; @@ -118,6 +123,18 @@ public void notifyEvicted(EvictionDispatcher evictionDispatcher) { evictionDispatcher.notifyEvicted(this); } + @Override public void setClockBit() { + //no op + } + + @Override public void unSetClockBit() { + //no op + } + + @Override public boolean isClockBitSet() { + return false; + } + @Override protected boolean isLocked() { return false; diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/ClockCachePolicyTest.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/ClockCachePolicyTest.java new file mode 100644 index 0000000000..03c4581cc3 --- /dev/null +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/ClockCachePolicyTest.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cache; + +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.common.io.CacheTag; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.validation.constraints.NotNull; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer.INVALIDATE_FAILED; +import static org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer.INVALIDATE_OK; + +@RunWith(Parameterized.class) +public class ClockCachePolicyTest { + private static final int BUFFER_SIZE = 16; + private static final long SEED = 9999; + public static final Predicate IS_EVEN = x -> x % 2 == 0; + public static final Function MOD_3 = x -> x % 3; + private final int numPages; + + ClockCachePolicyTest.EvictionTracker defaultEvictionListener = new ClockCachePolicyTest.EvictionTracker(); + + public ClockCachePolicyTest(int numElements) { + this.numPages = numElements; + } + + @Parameterized.Parameters + public static Collection collectioNumbers() { + return Arrays.asList(new Object[][] { + { 0 }, + { 1 }, + { 2 }, + { 5 }, + { 10009 }, + { 20002 } + }); + } + + @Test public void testCacheCalls() { + defaultEvictionListener.evicted.clear(); + List access = getLlapCacheableBuffers(numPages); + Collections.shuffle(access, new Random(SEED)); + ClockCachePolicy policy = new ClockCachePolicy(); + policy.setEvictionListener(defaultEvictionListener); + access.forEach(b -> policy.cache(b, LowLevelCache.Priority.NORMAL)); + //expect that clock hand is the first element and last element will be clockHand.prev + LlapCacheableBuffer currentHand = policy.getClockHand(); + LlapCacheableBuffer lastElement = numPages == 0 ? null : currentHand.prev; + if (numPages == 0) { + // no reason to test for this case + return; + } + int i = 0; + while (currentHand != lastElement) { + Assert.assertEquals(String.format("Buffers at iteration [%d] not matching", i), currentHand, access.get(i)); + currentHand = currentHand.next; + i++; + } + Assert.assertEquals(lastElement, access.size() == 0 ? null : access.get(i)); + Assert.assertEquals(i, numPages - 1); + + // going counter clock wise + i = access.size() - 1; + currentHand = policy.getClockHand().prev; + lastElement = currentHand.next; + while (currentHand != lastElement) { + Assert.assertEquals(String.format("Buffers at iteration [%d] not matching", i), currentHand, access.get(i)); + currentHand = currentHand.prev; + i--; + } + Assert.assertEquals(lastElement, access.get(i)); + Assert.assertEquals(i, 0); + } + + @Test(timeout = 6000L) public void testEvictionOneByOne() { + defaultEvictionListener.evicted.clear(); + List access = getLlapCacheableBuffers(numPages); + Collections.shuffle(access, new Random(SEED)); + ClockCachePolicy policy = new ClockCachePolicy(5); + policy.setEvictionListener(defaultEvictionListener); + access.forEach(b -> policy.cache(b, LowLevelCache.Priority.NORMAL)); + // set the flag for buffer with even index + for (int i = 0; i < access.size(); i++) { + if (i % 2 == 0 ) { + // touch some buffers. + policy.notifyLock(access.get(i)); + } + } + + while (policy.getClockHand() != null) { + // should not over evict + Assert.assertEquals(BUFFER_SIZE, policy.evictSomeBlocks(BUFFER_SIZE)); + } + Assert.assertEquals(defaultEvictionListener.evicted.size(), numPages); + // check the order of the eviction + for (int i = 0; i < access.size() / 2; i++) { + Assert.assertEquals(defaultEvictionListener.evicted.get(i), access.get(i * 2 + 1)); + Assert.assertEquals(defaultEvictionListener.evicted.get(i + access.size() / 2), access.get(i * 2)); + } + + } + + @Test public void testEvictionWithLockedBuffers() { + defaultEvictionListener.evicted.clear(); + List access = getLlapCacheableBuffers(numPages, IS_EVEN); + Collections.shuffle(access, new Random(SEED)); + ClockCachePolicy policy = new ClockCachePolicy(5); + policy.setEvictionListener(defaultEvictionListener); + access.forEach(b -> policy.cache(b, LowLevelCache.Priority.NORMAL)); + List + unlockedBuffers = + access.stream() + .filter(llapCacheableBuffer -> llapCacheableBuffer.invalidate() == INVALIDATE_OK) + .collect(Collectors.toList()); + Assert.assertEquals(unlockedBuffers.stream().mapToLong(LlapCacheableBuffer::getMemoryUsage).sum(), policy.purge()); + } + + + @Test public void testEvictionWithLockedBuffersAndInvalid() { + defaultEvictionListener.evicted.clear(); + List access = getLlapCacheableBuffersWithFn(numPages, MOD_3); + Collections.shuffle(access, new Random(SEED)); + ClockCachePolicy policy = new ClockCachePolicy(5); + policy.setEvictionListener(defaultEvictionListener); + access.forEach(b -> policy.cache(b, LowLevelCache.Priority.NORMAL)); + List + invalidateOkBuffers = + access.stream() + .filter(llapCacheableBuffer -> llapCacheableBuffer.invalidate() == INVALIDATE_OK) + .collect(Collectors.toList()); + + List + lockedBuffer = + access.stream() + .filter(llapCacheableBuffer -> llapCacheableBuffer.invalidate() == INVALIDATE_FAILED) + .collect(Collectors.toList()); + + Assert.assertEquals(invalidateOkBuffers.stream().mapToLong(LlapCacheableBuffer::getMemoryUsage).sum(), policy.purge()); + List actualLocked = Lists.newArrayList(policy.getIterator()); + Assert.assertEquals(lockedBuffer, actualLocked); + } + + @Test public void testPurgeWhenAllUnlocked() { + defaultEvictionListener.evicted.clear(); + List access = getLlapCacheableBuffers(numPages); + Collections.shuffle(access, new Random(SEED)); + ClockCachePolicy policy = new ClockCachePolicy(); + policy.setEvictionListener(defaultEvictionListener); + access.forEach(b -> policy.cache(b, LowLevelCache.Priority.NORMAL)); + policy.purge(); + Assert.assertEquals(defaultEvictionListener.evicted.size(), numPages); + for (int i = 0; i < access.size(); i++) { + Assert.assertEquals(defaultEvictionListener.evicted.get(i), access.get(i)); + } + } + + + @Test public void testPurgeWhenAllLocked() { + defaultEvictionListener.evicted.clear(); + List access = getLlapCacheableBuffers(numPages, i -> false); + Collections.shuffle(access, new Random(SEED)); + ClockCachePolicy policy = new ClockCachePolicy(); + policy.setEvictionListener(defaultEvictionListener); + access.forEach(b -> policy.cache(b, LowLevelCache.Priority.NORMAL)); + Assert.assertEquals(0, policy.purge()); + } + + @NotNull public static List getLlapCacheableBuffers(int numPages) { + return getLlapCacheableBuffers(numPages, i -> true); + } + + @NotNull public static List getLlapCacheableBuffers(int numPages, + Predicate invalidatePredicate) { + return IntStream.range(0, numPages) + .mapToObj(i -> new TestLlapCacheableBuffer(BUFFER_SIZE, + invalidatePredicate.test(i) ? INVALIDATE_OK : INVALIDATE_FAILED, + String.valueOf(i))) + .collect(Collectors.toList()); + } + + @NotNull public static List getLlapCacheableBuffersWithFn(int numPages, + Function invalidatePredicate) { + return IntStream.range(0, numPages) + .mapToObj(i -> new TestLlapCacheableBuffer(BUFFER_SIZE, + invalidatePredicate.apply(i), + String.valueOf(i))) + .collect(Collectors.toList()); + } + + public void testSimpleSequentialAccess() { + int numPages = 1001; + ClockCachePolicyTest.EvictionTracker evictionListener = new ClockCachePolicyTest.EvictionTracker(); + List + access = + IntStream.range(0, numPages) + .mapToObj(i -> new ClockCachePolicyTest.TestLlapCacheableBuffer(BUFFER_SIZE, + i % 2 == 0 ? INVALIDATE_OK : INVALIDATE_FAILED, + String.valueOf(i))) + .collect(Collectors.toList()); + Collections.shuffle(access, ThreadLocalRandom.current()); + ClockCachePolicy policy = new ClockCachePolicy(); + policy.setEvictionListener(evictionListener); + access.forEach(b -> policy.cache(b, LowLevelCache.Priority.NORMAL)); + Assert.assertEquals(policy.getClockHand().prev, access.get(access.size() - 1)); + Assert.assertEquals(policy.getClockHand(), access.size() > 1 ? access.get(0) : null); + Collections.shuffle(access, ThreadLocalRandom.current()); + access.forEach(b -> { + policy.notifyLock(b); + policy.notifyUnlock(b); + }); + //Assert.assertEquals(policy.getClockHand().prev, access.get(access.size() - 1)); + //Assert.assertEquals(policy.getClockHand(), access.size() > 1 ? access.get(0) : null); + Assert.assertEquals(access.stream() + .filter(b -> b.invalidate() == INVALIDATE_OK) + .mapToLong(LlapCacheableBuffer::getMemoryUsage) + .sum(), policy.purge()); + Assert.assertArrayEquals(access.stream().filter(b -> b.invalidate() == INVALIDATE_OK).toArray(), + evictionListener.evicted.toArray()); + Assert.assertNull(policy.getClockHand()); + // re add more stuff to the cache. + access.forEach(policy::notifyUnlock); + Assert.assertEquals(policy.getClockHand(), access.get(access.size() - 1)); + Assert.assertEquals(policy.getClockHand(), access.size() > 1 ? access.get(0) : null); + //test eviction of some blocks + long targetSize = BUFFER_SIZE * 5; + evictionListener.evicted.clear(); + long evicted = policy.evictSomeBlocks(targetSize); + Assert.assertEquals(targetSize, evicted); + Assert.assertEquals(evicted, + evictionListener.evicted.stream().mapToLong(LlapCacheableBuffer::getMemoryUsage).sum()); + } + + private static class TestLlapCacheableBuffer extends LlapAllocatorBuffer { + private final long size; + private int state; + private final String tag; + + private TestLlapCacheableBuffer(long size, int state, String tag) { + this.size = size; + this.state = state; + this.tag = tag; + } + + @Override public int invalidate() { + return state; + } + + @Override public long getMemoryUsage() { + return size; + } + + @Override public void notifyEvicted(EvictionDispatcher evictionDispatcher) { + evictionDispatcher.notifyEvicted(this); + } + + + + @Override public CacheTag getTag() { + return CacheTag.build(tag); + } + + @Override public boolean isLocked() { + return state != INVALIDATE_OK; + } + } + + private class EvictionTracker implements EvictionListener { + public List evicted = new ArrayList<>(); + + @Override public void notifyEvicted(LlapCacheableBuffer buffer) { + evicted.add(buffer); + } + } + + @Test + public void testCyclicProperty(){ + List access = getLlapCacheableBuffers(numPages); + Collections.shuffle(access, new Random(SEED)); + ClockCachePolicy policy = new ClockCachePolicy(); + policy.setEvictionListener(defaultEvictionListener); + access.forEach(b -> policy.cache(b, LowLevelCache.Priority.NORMAL)); + + LlapCacheableBuffer root = policy.getClockHand(); + LlapCacheableBuffer clockHand = root; + + if (numPages == 0 ) { + Assert.assertNull(clockHand); + return;// base case empty circle; + } + int i = 0; + do { + Assert.assertNotNull(clockHand.next); + Assert.assertEquals(clockHand.next.prev, clockHand); + clockHand = clockHand.next; + i++; + } while (clockHand != root); + Assert.assertEquals("Clock entries size doesn't match", i, numPages); + } +} \ No newline at end of file diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/LlapAllocatorBufferStateTest.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/LlapAllocatorBufferStateTest.java new file mode 100644 index 0000000000..59092a6242 --- /dev/null +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/LlapAllocatorBufferStateTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cache; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Test Class for the buffer state + */ +public class LlapAllocatorBufferStateTest { + // The clock bit is the most significant bit of the 64 bit word + @Test + public void testTestFixtureAsserts(){ + Assert.assertEquals(Long.MIN_VALUE, LlapAllocatorBuffer.State.CLOCK_BIT_MASK); + Assert.assertEquals(64, + LlapAllocatorBuffer.State.FLAGS_WIDTH + + LlapAllocatorBuffer.State.REFCOUNT_WIDTH + + LlapAllocatorBuffer.State.ARENA_WIDTH + + LlapAllocatorBuffer.State.HEADER_WIDTH + + LlapAllocatorBuffer.State.CLOCK_BIT_WIDTH); + } + @Test + public void testSetClockBit() { + long state = 4999L; + Assert.assertFalse(LlapAllocatorBuffer.State.isClockBitSet(state)); + Assert.assertEquals(Long.MIN_VALUE | state, LlapAllocatorBuffer.State.setClockBit(state)); + state = -4999L; + Assert.assertTrue(LlapAllocatorBuffer.State.isClockBitSet(state)); + Assert.assertEquals(Long.MIN_VALUE | state, LlapAllocatorBuffer.State.setClockBit(state)); + } + + @Test + public void testUnsetClockBit() { + long state = -4888L; + Assert.assertTrue(LlapAllocatorBuffer.State.isClockBitSet(state)); + Assert.assertEquals(~(Long.MIN_VALUE) & state, LlapAllocatorBuffer.State.unSetClockBit(state)); + state = 4555L; + Assert.assertFalse(LlapAllocatorBuffer.State.isClockBitSet(state)); + Assert.assertEquals(~(Long.MIN_VALUE) & state, LlapAllocatorBuffer.State.unSetClockBit(state)); + } +} diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java index fbe58ff919..c5efd7f12e 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java @@ -72,7 +72,7 @@ public void testRegression_HIVE_12178() throws Exception { // buffer2 is now in the heap, buffer1 is in the list. "Use" buffer1 again; // before we notify though, lock the list, so lock cannot remove it from the list. buffer1.incRef(); - assertEquals(LlapCacheableBuffer.IN_LIST, buffer1.indexInHeap); + assertEquals(LowLevelLrfuCachePolicy.IN_LIST, ((LrfuCacheAttribute) buffer1.cacheAttribute).indexInHeap); listLock.lock(); try { Thread otherThread = new Thread(new Runnable() { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/IoTrace.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/IoTrace.java index bfc82b591c..dca7e28976 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/IoTrace.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/IoTrace.java @@ -459,18 +459,15 @@ public void logCompositeOrcCb(int lastChunkTaken, int lastChunkRemaining, DiskRa this.offset += 5; } - public static FixedSizedObjectPool createTracePool(Configuration conf) { - final int ioTraceSize = (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_IO_TRACE_SIZE); + public static FixedSizedObjectPool createTracePool(Configuration conf, int ioThreads) { + final int ioTraceSize = (int) HiveConf.getSizeVar(conf, ConfVars.LLAP_IO_TRACE_SIZE); final boolean isAlwaysDump = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_TRACE_ALWAYS_DUMP); - int ioThreads = HiveConf.getIntVar(conf, ConfVars.LLAP_IO_THREADPOOL_SIZE); return new FixedSizedObjectPool<>(ioThreads, new Pool.PoolObjectHelper() { - @Override - public IoTrace create() { + @Override public IoTrace create() { return new IoTrace(ioTraceSize, isAlwaysDump); } - @Override - public void resetBeforeOffer(IoTrace t) { + @Override public void resetBeforeOffer(IoTrace t) { t.reset(); } }); diff --git storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java index ba6e5344ea..bc29e22e5e 100644 --- storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java +++ storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java @@ -239,7 +239,7 @@ public int hashCode() { /** * CacheTag for tables with more than one partition level. */ - public static final class MultiPartitionCacheTag extends PartitionCacheTag { + public static final class MultiPartitionCacheTag extends PartitionCacheTag { private final String[] partitionDesc; diff --git storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java index 9b23a710c4..da50c23efc 100644 --- storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java +++ storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java @@ -22,7 +22,7 @@ /** An abstract data cache that IO formats can use to retrieve and cache data. */ public interface DataCache { - public static final class BooleanRef { + final class BooleanRef { public boolean value; } @@ -57,8 +57,11 @@ * @param gotAllData An out param - whether all the requested data was found in cache. * @return The new or modified list of DiskRange-s, where some ranges may contain cached data. */ - DiskRangeList getFileData(Object fileKey, DiskRangeList range, long baseOffset, - DiskRangeListFactory factory, BooleanRef gotAllData); + DiskRangeList getFileData(Object fileKey, + DiskRangeList range, + long baseOffset, + DiskRangeListFactory factory, + BooleanRef gotAllData); @Deprecated long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] data, long baseOffset); @@ -109,6 +112,5 @@ DiskRangeList getFileData(Object fileKey, DiskRangeList range, long baseOffset, * @return null if all data was put; bitmask indicating which chunks were not put otherwise; * the replacement chunks from cache are updated directly in the array. */ - long[] putFileData(Object fileKey, DiskRange[] ranges, - MemoryBuffer[] data, long baseOffset, CacheTag tag); + long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] data, long baseOffset, CacheTag tag); }