diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index aedd1ec975..671c77b4d4 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1409,7 +1409,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "while writing a table with ORC file format, enabling this config will do stripe-level\n" + "fast merge for small ORC files. Note that enabling this config will not honor the\n" + "padding tolerance config (hive.exec.orc.block.padding.tolerance)."), - HIVE_ORC_CODEC_POOL("hive.use.orc.codec.pool", true, + HIVE_ORC_CODEC_POOL("hive.use.orc.codec.pool", false, "Whether to use codec pool in ORC. Disable if there are bugs with codec reuse."), HIVEUSEEXPLICITRCFILEHEADER("hive.exec.rcfile.use.explicit.header", true, @@ -3181,11 +3181,17 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal LLAP_ALLOCATOR_DEFRAG_HEADROOM("hive.llap.io.allocator.defrag.headroom", "1Mb", "How much of a headroom to leave to allow allocator more flexibility to defragment.\n" + "The allocator would further cap it to a fraction of total memory."), + LLAP_TRACK_CACHE_USAGE("hive.llap.io.track.cache.usage", true, + "Whether to tag LLAP cache contents, mapping them to Hive entities (paths for\n" + + "partitions and tables) for reporting."), LLAP_USE_LRFU("hive.llap.io.use.lrfu", true, "Whether ORC low-level cache should use LRFU cache policy instead of default (FIFO)."), - LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.01f, + LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.000001f, "Lambda for ORC low-level cache LRFU cache policy. Must be in [0, 1]. 0 makes LRFU\n" + - "behave like LFU, 1 makes it behave like LRU, values in between balance accordingly."), + "behave like LFU, 1 makes it behave like LRU, values in between balance accordingly.\n" + + "The meaning of this parameter is the inverse of the number of time ticks (cache\n" + + " operations, currently) that cause the combined recency-frequency of a block in cache\n" + + " to be halved."), LLAP_CACHE_ALLOW_SYNTHETIC_FILEID("hive.llap.cache.allow.synthetic.fileid", false, "Whether LLAP cache should use synthetic file ID if real one is not available. Systems\n" + "like HDFS, Isilon, etc. provide a unique file/inode ID. On other FSes (e.g. local\n" + diff --git llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java index aecaacccb4..50c0e22837 100644 --- llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java +++ llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java @@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.ipc.ProtobufRpcEngine; @@ -299,4 +301,69 @@ public static String humanReadableByteCount(long bytes) { LOG.info("Instantiated " + protocolClass.getSimpleName() + " at " + bindAddressVal); return server; } + + // Copied from AcidUtils so we don't have to put the code using this into ql. + // TODO: Ideally, AcidUtils class and various constants should be in common. + private static final String BASE_PREFIX = "base_", DELTA_PREFIX = "delta_", + DELETE_DELTA_PREFIX = "delete_delta_", BUCKET_PREFIX = "bucket_", + DATABASE_PATH_SUFFIX = ".db", UNION_SUDBIR_PREFIX = "HIVE_UNION_SUBDIR_"; + + public static final char DERIVED_ENTITY_PARTITION_SEPARATOR = '/'; + + public static String getDbAndTableNameForMetrics(Path path, boolean includeParts) { + String[] parts = path.toUri().getPath().toString().split(Path.SEPARATOR); + int dbIx = -1; + // Try to find the default db postfix; don't check two last components - at least there + // should be a table and file (we could also try to throw away partition/bucket/acid stuff). + for (int i = 0; i < parts.length - 2; ++i) { + if (!parts[i].endsWith(DATABASE_PATH_SUFFIX)) continue; + if (dbIx >= 0) { + dbIx = -1; // Let's not guess which one is correct. + break; + } + dbIx = i; + } + if (dbIx >= 0) { + String dbAndTable = parts[dbIx].substring( + 0, parts[dbIx].length() - 3) + "." + parts[dbIx + 1]; + if (!includeParts) return dbAndTable; + for (int i = dbIx + 2; i < parts.length; ++i) { + if (!parts[i].contains("=")) break; + dbAndTable += "/" + parts[i]; + } + return dbAndTable; + } + + // Just go from the back and throw away everything we think is wrong; skip last item, the file. + boolean isInPartFields = false; + for (int i = parts.length - 2; i >= 0; --i) { + String p = parts[i]; + boolean isPartField = p.contains("="); + if ((isInPartFields && !isPartField) || (!isPartField && !isSomeHiveDir(p))) { + dbIx = i - 1; // Assume this is the table we are at now. + break; + } + isInPartFields = isPartField; + } + // If we found something before we ran out of components, use it. + if (dbIx >= 0) { + String dbName = parts[dbIx]; + if (dbName.endsWith(DATABASE_PATH_SUFFIX)) { + dbName = dbName.substring(0, dbName.length() - 3); + } + String dbAndTable = dbName + "." + parts[dbIx + 1]; + if (!includeParts) return dbAndTable; + for (int i = dbIx + 2; i < parts.length; ++i) { + if (!parts[i].contains("=")) break; + dbAndTable += "/" + parts[i]; + } + return dbAndTable; + } + return "unknown"; + } + + private static boolean isSomeHiveDir(String p) { + return p.startsWith(BASE_PREFIX) || p.startsWith(DELTA_PREFIX) || p.startsWith(BUCKET_PREFIX) + || p.startsWith(UNION_SUDBIR_PREFIX) || p.startsWith(DELETE_DELTA_PREFIX); + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java new file mode 100644 index 0000000000..4fbaac1db0 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cache; + +import java.util.Iterator; +import java.util.concurrent.ConcurrentSkipListMap; + +import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; + +/** + * A wrapper around cache eviction policy that tracks cache contents via tags. + */ +public class CacheContentsTracker implements LowLevelCachePolicy, EvictionListener { + private static final long CLEANUP_TIME_MS = 3600 * 1000L, MIN_TIME_MS = 300 * 1000L; + + private final ConcurrentSkipListMap tagInfo = new ConcurrentSkipListMap<>(); + private EvictionListener evictionListener; + private LowLevelCachePolicy realPolicy; + private final Thread cleanupThread; + + public CacheContentsTracker(LowLevelCachePolicy realPolicy) { + this.realPolicy = realPolicy; + realPolicy.setEvictionListener(this); + this.cleanupThread = new Thread(new CleanupRunnable()); + this.cleanupThread.start(); + } + + private final class CleanupRunnable implements Runnable { + @Override + public void run() { + final long cleanupTimeNs = CLEANUP_TIME_MS * 1000000L; + long sleepTimeMs = CLEANUP_TIME_MS; + try { + while (true) { + Thread.sleep(sleepTimeMs); + long timeNs = System.nanoTime(); + long nextCleanupInNs = cleanupTimeNs; + Iterator iter = tagInfo.values().iterator(); + while (iter.hasNext()) { + TagState v = iter.next(); + synchronized (v) { + if (v.bufferCount > 0) continue; // The file is still cached. + long deltaNs = timeNs - v.emptyTimeNs; + if (deltaNs < cleanupTimeNs) { + nextCleanupInNs = Math.min(nextCleanupInNs, deltaNs); + continue; + } else { + iter.remove(); + } + } + } + sleepTimeMs = Math.max(MIN_TIME_MS, nextCleanupInNs / 1000000L); + } + } catch (InterruptedException ex) { + return; // Interrupted. + } + } + } + + private static class TagState { + public TagState(String name) { + this.name = name; + } + public final String name; + public long emptyTimeNs; + public long bufferCount, totalSize, maxCount, maxSize; + public boolean isRemoved = false; + } + + + private void reportCached(LlapCacheableBuffer buffer) { + long size = buffer.getMemoryUsage(); + TagState state; + do { + state = getTagState(buffer); + } while (!reportCached(state, size)); + state = null; + do { + state = getParentTagState(buffer); + if (state == null) break; + } while (!reportCached(state, size)); + } + + private boolean reportCached(TagState state, long size) { + synchronized (state) { + if (state.isRemoved) return false; + ++state.bufferCount; + state.totalSize += size; + state.maxSize = Math.max(state.maxSize, state.totalSize); + state.maxCount = Math.max(state.maxCount, state.bufferCount); + } + return true; + } + + private void reportRemoved(LlapCacheableBuffer buffer) { + long size = buffer.getMemoryUsage(); + TagState state; + do { + state = getTagState(buffer); + } while (!reportRemoved(state, size)); + state = null; + do { + state = getParentTagState(buffer); + if (state == null) break; + } while (!reportRemoved(state, size)); + } + + private boolean reportRemoved(TagState state, long size) { + synchronized (state) { + if (state.isRemoved) return false; + --state.bufferCount; + assert state.bufferCount >= 0; + state.totalSize -= size; + if (state.bufferCount == 0) { + state.emptyTimeNs = System.nanoTime(); + } + } + return true; + } + + private TagState getTagState(LlapCacheableBuffer buffer) { + return getTagState(buffer.getTag()); + } + + private TagState getParentTagState(LlapCacheableBuffer buffer) { + String tag = buffer.getTag(); + int ix = tag.indexOf(LlapUtil.DERIVED_ENTITY_PARTITION_SEPARATOR); + if (ix <= 0) return null; + return getTagState(tag.substring(0, ix)); + } + + private TagState getTagState(String tag) { + TagState state = tagInfo.get(tag); + if (state == null) { + state = new TagState(tag); + TagState old = tagInfo.putIfAbsent(tag, state); + state = (old == null) ? state : old; + } + return state; + } + + + @Override + public void cache(LlapCacheableBuffer buffer, Priority priority) { + realPolicy.cache(buffer, priority); + reportCached(buffer); + } + + @Override + public void notifyLock(LlapCacheableBuffer buffer) { + realPolicy.notifyLock(buffer); + } + + @Override + public void notifyUnlock(LlapCacheableBuffer buffer) { + realPolicy.notifyUnlock(buffer); + } + + @Override + public void setEvictionListener(EvictionListener listener) { + evictionListener = listener; + } + + @Override + public void setParentDebugDumper(LlapOomDebugDump dumper) { + realPolicy.setParentDebugDumper(dumper); + } + + + @Override + public long evictSomeBlocks(long memoryToReserve) { + return realPolicy.evictSomeBlocks(memoryToReserve); + } + + @Override + public String debugDumpForOom() { + return realPolicy.debugDumpForOom(); + } + + @Override + public void debugDumpShort(StringBuilder sb) { + sb.append("\nCache state: "); + for (TagState state : tagInfo.values()) { + synchronized (state) { + sb.append("\n").append(state.name).append(": ").append(state.bufferCount).append("/") + .append(state.maxCount).append(", ").append(state.totalSize).append("/") + .append(state.maxSize); + } + } + realPolicy.debugDumpShort(sb); + } + + @Override + public void notifyEvicted(LlapCacheableBuffer buffer) { + evictionListener.notifyEvicted(buffer); + reportRemoved(buffer); + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java index e976090c31..f91a5d91a5 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java @@ -56,5 +56,7 @@ public String toStringForCache() { + lastUpdate + " " + (isLocked() ? "!" : ".") + "]"; } + public abstract String getTag(); + protected abstract boolean isLocked(); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java index 266f46e124..405fca2d4f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java @@ -24,9 +24,19 @@ /** ORC cache uses this to store compressed length; buffer is cached uncompressed, but * the lookup is on compressed ranges, so we need to know this. */ public int declaredCachedLength = UNKNOWN_CACHED_LENGTH; + private String tag; @Override public void notifyEvicted(EvictionDispatcher evictionDispatcher) { evictionDispatcher.notifyEvicted(this); } + + public void setTag(String tag) { + this.tag = tag; + } + + @Override + public String getTag() { + return tag; + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java index a6330a3b82..af1b699771 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java @@ -58,7 +58,7 @@ DiskRangeList getFileData(Object fileKey, DiskRangeList range, long baseOffset, * the replacement chunks from cache are updated directly in the array. */ long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] chunks, - long baseOffset, Priority priority, LowLevelCacheCounters qfCounters); + long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, String tag); /** Notifies the cache that a particular buffer should be removed due to eviction. */ void notifyEvicted(MemoryBuffer buffer); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java index a5494c7457..5e102d93de 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java @@ -288,7 +288,7 @@ private boolean lockBuffer(LlapDataBuffer buffer, boolean doNotifyPolicy) { @Override public long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] buffers, - long baseOffset, Priority priority, LowLevelCacheCounters qfCounters) { + long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, String tag) { long[] result = null; assert buffers.length == ranges.length; FileCache> subCache = @@ -304,6 +304,7 @@ private boolean lockBuffer(LlapDataBuffer buffer, boolean doNotifyPolicy) { long offset = ranges[i].getOffset() + baseOffset; assert buffer.declaredCachedLength == LlapDataBuffer.UNKNOWN_CACHED_LENGTH; buffer.declaredCachedLength = ranges[i].getLength(); + buffer.setTag(tag); while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value). LlapDataBuffer oldVal = subCache.getCache().putIfAbsent(offset, buffer); if (oldVal == null) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java index 2659d9e951..b42f76184c 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -153,7 +154,7 @@ public void notifyUnlock(LlapCacheableBuffer buffer) { } finally { listLock.unlock(); } - // Now insert the buffer in its place and restore heap property. + // Now insert the new buffer in its place and restore heap property. buffer.indexInHeap = 0; heapifyDownUnderLock(buffer, time); } else { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java index 08713916a6..cb89d12e80 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java @@ -55,10 +55,18 @@ public static final class LlapSerDeDataBuffer extends LlapAllocatorBuffer { public boolean isCached = false; + private String tag; @Override public void notifyEvicted(EvictionDispatcher evictionDispatcher) { evictionDispatcher.notifyEvicted(this); } + public void setTag(String tag) { + this.tag = tag; + } + @Override + public String getTag() { + return tag; + } } private static final class StripeInfoComparator implements @@ -491,7 +499,7 @@ private boolean lockBuffer(LlapSerDeDataBuffer buffer, boolean doNotifyPolicy) { } public void putFileData(final FileData data, Priority priority, - LowLevelCacheCounters qfCounters) { + LowLevelCacheCounters qfCounters, String tag) { // TODO: buffers are accounted for at allocation time, but ideally we should report the memory // overhead from the java objects to memory manager and remove it when discarding file. if (data.stripes == null || data.stripes.isEmpty()) { @@ -521,7 +529,7 @@ public FileData apply(Void input) { } try { for (StripeData si : data.stripes) { - lockAllBuffersForPut(si, priority); + lockAllBuffersForPut(si, priority, tag); } if (data == cached) { if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) { @@ -566,7 +574,7 @@ public FileData apply(Void input) { } } - private void lockAllBuffersForPut(StripeData si, Priority priority) { + private void lockAllBuffersForPut(StripeData si, Priority priority, String tag) { for (int i = 0; i < si.data.length; ++i) { LlapSerDeDataBuffer[][] colData = si.data[i]; if (colData == null) continue; @@ -576,6 +584,7 @@ private void lockAllBuffersForPut(StripeData si, Priority priority) { for (int k = 0; k < streamData.length; ++k) { boolean canLock = lockBuffer(streamData[k], false); // false - not in cache yet assert canLock; + streamData[k].setTag(tag); cachePolicy.cache(streamData[k], priority); streamData[k].isCached = true; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java index 027e4149cd..a1b6caeb60 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java @@ -84,9 +84,8 @@ public DiskRangeList getFileData(Object fileKey, DiskRangeList range, long baseO } @Override - public long[] putFileData(Object fileKey, DiskRange[] ranges, - MemoryBuffer[] chunks, long baseOffset, Priority priority, - LowLevelCacheCounters qfCounters) { + public long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] chunks, + long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, String tag) { for (int i = 0; i < chunks.length; ++i) { LlapAllocatorBuffer buffer = (LlapAllocatorBuffer)chunks[i]; if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index 5a397bed09..e5bc3c2bfa 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.cache.BuddyAllocator; import org.apache.hadoop.hive.llap.cache.BufferUsageManager; +import org.apache.hadoop.hive.llap.cache.CacheContentsTracker; import org.apache.hadoop.hive.llap.cache.EvictionDispatcher; import org.apache.hadoop.hive.llap.cache.LlapDataBuffer; import org.apache.hadoop.hive.llap.cache.LlapOomDebugDump; @@ -76,6 +77,7 @@ + import com.google.common.primitives.Ints; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -139,6 +141,10 @@ private LlapIoImpl(Configuration conf) throws IOException { int minAllocSize = (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC); LowLevelCachePolicy cachePolicy = useLrfu ? new LowLevelLrfuCachePolicy( minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy(); + boolean trackUsage = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE); + if (trackUsage) { + cachePolicy = new CacheContentsTracker(cachePolicy); + } // Allocator uses memory manager to request memory, so create the manager next. LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager( totalMemorySize, cachePolicy, cacheMetrics); @@ -256,7 +262,14 @@ public DiskRangeList getFileData(Object fileKey, DiskRangeList range, @Override public long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] data, long baseOffset) { - return lowLevelCache.putFileData(fileKey, ranges, data, baseOffset, Priority.NORMAL, null); + return putFileData(fileKey, ranges, data, baseOffset, null); + } + + @Override + public long[] putFileData(Object fileKey, DiskRange[] ranges, + MemoryBuffer[] data, long baseOffset, String tag) { + return lowLevelCache.putFileData( + fileKey, ranges, data, baseOffset, Priority.NORMAL, null, tag); } @Override diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 458d9269cb..8841b955ef 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -22,7 +22,6 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import org.apache.hadoop.hive.llap.counters.LlapIOCounters; @@ -61,6 +60,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.DebugUtils; +import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.cache.BufferUsageManager; import org.apache.hadoop.hive.llap.cache.LlapDataBuffer; import org.apache.hadoop.hive.llap.cache.LowLevelCache; @@ -74,16 +74,12 @@ import org.apache.hadoop.hive.llap.io.metadata.MetadataCache; import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapBufferOrBuffers; import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata; -import org.apache.hadoop.hive.ql.exec.DDLTask; -import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HdfsUtils; import org.apache.orc.CompressionKind; import org.apache.orc.DataReader; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions; -import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.orc.OrcConf; -import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcSplit; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; @@ -169,6 +165,7 @@ public void resetBeforeOffer(OrcEncodedColumnBatch t) { private EncodedReader stripeReader; private CompressionCodec codec; private Object fileKey; + private final String cacheTag; private FileSystem fs; /** @@ -209,6 +206,8 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff // LlapInputFormat needs to know the file schema to decide if schema evolution is supported. orcReader = null; + cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE) + ? LlapUtil.getDbAndTableNameForMetrics(split.getPath(), true) : null; // 1. Get file metadata from cache, or create the reader and read it. // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that fs = split.getPath().getFileSystem(jobConf); @@ -271,7 +270,8 @@ protected Void performDataRead() throws IOException, InterruptedException { recordReaderTime(startTime); return null; } - counters.setDesc(QueryFragmentCounters.Desc.TABLE, getDbAndTableName(split.getPath())); + counters.setDesc(QueryFragmentCounters.Desc.TABLE, + LlapUtil.getDbAndTableNameForMetrics(split.getPath(), false)); counters.setDesc(QueryFragmentCounters.Desc.FILE, split.getPath() + (fileKey == null ? "" : " (" + fileKey + ")")); try { @@ -432,7 +432,7 @@ private void ensureDataReader() throws IOException { // Reader creation updates HDFS counters, don't do it here. DataWrapperForOrc dw = new DataWrapperForOrc(); stripeReader = orcReader.encodedReader( - fileKey, dw, dw, useObjectPools ? POOL_FACTORY : null, trace, useCodecPool); + fileKey, dw, dw, useObjectPools ? POOL_FACTORY : null, trace, useCodecPool, cacheTag); stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled()); } @@ -440,49 +440,6 @@ private void recordReaderTime(long startTime) { counters.incrTimeCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime); } - private static String getDbAndTableName(Path path) { - // Ideally, we'd get this from split; however, split doesn't contain any such thing and it's - // actually pretty hard to get cause even split generator only uses paths. We only need this - // for metrics; therefore, brace for BLACK MAGIC! - String[] parts = path.toUri().getPath().toString().split(Path.SEPARATOR); - int dbIx = -1; - // Try to find the default db postfix; don't check two last components - at least there - // should be a table and file (we could also try to throw away partition/bucket/acid stuff). - for (int i = 0; i < parts.length - 2; ++i) { - if (!parts[i].endsWith(DDLTask.DATABASE_PATH_SUFFIX)) continue; - if (dbIx >= 0) { - dbIx = -1; // Let's not guess. - break; - } - dbIx = i; - } - if (dbIx >= 0) { - return parts[dbIx].substring(0, parts[dbIx].length() - 3) + "." + parts[dbIx + 1]; - } - - // Just go from the back and throw away everything we think is wrong; skip last item, the file. - boolean isInPartFields = false; - for (int i = parts.length - 2; i >= 0; --i) { - String p = parts[i]; - boolean isPartField = p.contains("="); - if ((isInPartFields && !isPartField) || (!isPartField && !p.startsWith(AcidUtils.BASE_PREFIX) - && !p.startsWith(AcidUtils.DELTA_PREFIX) && !p.startsWith(AcidUtils.BUCKET_PREFIX))) { - dbIx = i - 1; - break; - } - isInPartFields = isPartField; - } - // If we found something before we ran out of components, use it. - if (dbIx >= 0) { - String dbName = parts[dbIx]; - if (dbName.endsWith(DDLTask.DATABASE_PATH_SUFFIX)) { - dbName = dbName.substring(0, dbName.length() - 3); - } - return dbName + "." + parts[dbIx + 1]; - } - return "unknown"; - } - private void validateFileMetadata() throws IOException { if (fileMetadata.getCompressionKind() == CompressionKind.NONE) return; int bufferSize = fileMetadata.getCompressionBufferSize(); @@ -528,6 +485,7 @@ private void cleanupReaders() { if (rawDataReader != null && isRawDataReaderOpen) { try { rawDataReader.close(); + rawDataReader = null; } catch (IOException ex) { // Ignore. } @@ -623,7 +581,7 @@ private OrcFileMetadata getFileFooterFromCacheOrDisk() throws IOException { ensureOrcReader(); ByteBuffer tailBufferBb = orcReader.getSerializedFileFooter(); if (hasCache) { - tailBuffers = metadataCache.putFileMetadata(fileKey, tailBufferBb); + tailBuffers = metadataCache.putFileMetadata(fileKey, tailBufferBb, cacheTag); metadataCache.decRefBuffer(tailBuffers); // We don't use the cache's copy of the buffer. } FileTail ft = orcReader.getFileTail(); @@ -716,7 +674,7 @@ private OrcStripeMetadata createOrcStripeMetadataObject(int stripeIx, StripeInfo assert footerRange.next == null; // Can only happens w/zcr for a single input buffer. if (hasCache) { LlapBufferOrBuffers cacheBuf = metadataCache.putStripeTail( - stripeKey, footerRange.getData().duplicate()); + stripeKey, footerRange.getData().duplicate(), cacheTag); metadataCache.decRefBuffer(cacheBuf); // We don't use this one. } ByteBuffer bb = footerRange.getData().duplicate(); @@ -947,9 +905,15 @@ public DiskRangeList getFileData(Object fileKey, DiskRangeList range, @Override public long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] data, long baseOffset) { + return putFileData(fileKey, ranges, data, baseOffset, null); + } + + @Override + public long[] putFileData(Object fileKey, DiskRange[] ranges, + MemoryBuffer[] data, long baseOffset, String tag) { if (data != null) { return lowLevelCache.putFileData( - fileKey, ranges, data, baseOffset, Priority.NORMAL, counters); + fileKey, ranges, data, baseOffset, Priority.NORMAL, counters, tag); } else if (metadataCache != null) { metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java index 506146b72a..8b89ae9ad4 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.DebugUtils; +import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.cache.BufferUsageManager; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl; @@ -146,6 +147,7 @@ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end private final Map parts; private final Object fileKey; + private final String cacheTag; private final FileSystem fs; private volatile boolean isStopped = false; @@ -212,6 +214,8 @@ public MemoryBuffer create() { fileKey = determineFileId(fs, split, HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID), HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID)); + cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE) + ? LlapUtil.getDbAndTableNameForMetrics(split.getPath(), true) : null; this.sourceInputFormat = sourceInputFormat; this.sourceSerDe = sourceSerDe; this.reporter = reporter; @@ -735,7 +739,7 @@ public void cacheFileData(StripeData sd) { } FileData fd = new FileData(fileKey, encodings.length); fd.addStripe(sd); - cache.putFileData(fd, Priority.NORMAL, counters); + cache.putFileData(fd, Priority.NORMAL, counters, cacheTag); } else { lockAllBuffers(sd); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java index cfb3e42d7b..0184e3053f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.llap.io.metadata; import org.apache.hadoop.hive.common.FileUtils; - import org.apache.hadoop.hive.common.io.FileMetadataCache; import java.io.IOException; @@ -158,22 +157,33 @@ private LlapBufferOrBuffers getInternal(Object key) { @Override public LlapBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer) { - return putInternal(fileKey, tailBuffer); + return putInternal(fileKey, tailBuffer, null); } - public LlapBufferOrBuffers putStripeTail(OrcBatchKey stripeKey, ByteBuffer tailBuffer) { - return putInternal(new StripeKey(stripeKey.fileKey, stripeKey.stripeIx), tailBuffer); + @Override + public LlapBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, String tag) { + return putInternal(fileKey, tailBuffer, tag); } + public LlapBufferOrBuffers putStripeTail( + OrcBatchKey stripeKey, ByteBuffer tailBuffer, String tag) { + return putInternal(new StripeKey(stripeKey.fileKey, stripeKey.stripeIx), tailBuffer, tag); + } @Override public LlapBufferOrBuffers putFileMetadata( Object fileKey, int length, InputStream is) throws IOException { + return putFileMetadata(fileKey, length, is, null); + } + + @Override + public LlapBufferOrBuffers putFileMetadata( + Object fileKey, int length, InputStream is, String tag) throws IOException { LlapBufferOrBuffers result = null; while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value). LlapBufferOrBuffers oldVal = metadata.get(fileKey); if (oldVal == null) { - result = wrapBbForFile(result, fileKey, length, is); + result = wrapBbForFile(result, fileKey, length, is, tag); if (!lockBuffer(result, false)) { throw new AssertionError("Cannot lock a newly created value " + result); } @@ -194,14 +204,14 @@ public LlapBufferOrBuffers putFileMetadata( @SuppressWarnings({ "rawtypes", "unchecked" }) private LlapBufferOrBuffers wrapBbForFile(LlapBufferOrBuffers result, - Object fileKey, int length, InputStream stream) throws IOException { + Object fileKey, int length, InputStream stream, String tag) throws IOException { if (result != null) return result; int maxAlloc = allocator.getMaxAllocation(); LlapMetadataBuffer[] largeBuffers = null; if (maxAlloc < length) { largeBuffers = new LlapMetadataBuffer[length / maxAlloc]; for (int i = 0; i < largeBuffers.length; ++i) { - largeBuffers[i] = new LlapMetadataBuffer(fileKey); + largeBuffers[i] = new LlapMetadataBuffer(fileKey, tag); } allocator.allocateMultiple(largeBuffers, maxAlloc, null); for (int i = 0; i < largeBuffers.length; ++i) { @@ -213,7 +223,7 @@ private LlapBufferOrBuffers wrapBbForFile(LlapBufferOrBuffers result, return new LlapMetadataBuffers(largeBuffers); } else { LlapMetadataBuffer[] smallBuffer = new LlapMetadataBuffer[1]; - smallBuffer[0] = new LlapMetadataBuffer(fileKey); + smallBuffer[0] = new LlapMetadataBuffer(fileKey, tag); allocator.allocateMultiple(smallBuffer, length, null); readIntoCacheBuffer(stream, smallSize, smallBuffer[0]); if (largeBuffers == null) { @@ -239,12 +249,12 @@ private static void readIntoCacheBuffer( bb.position(pos); } - private LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer) { + private LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer, String tag) { LlapBufferOrBuffers result = null; while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value). LlapBufferOrBuffers oldVal = metadata.get(key); if (oldVal == null) { - result = wrapBb(result, key, tailBuffer); + result = wrapBb(result, key, tailBuffer, tag); oldVal = metadata.putIfAbsent(key, result); if (oldVal == null) { cacheInPolicy(result); // Cached successfully, add to policy. @@ -302,17 +312,17 @@ public void decRefBuffer(MemoryBufferOrBuffers buffer) { } private LlapBufferOrBuffers wrapBb( - LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer) { + LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer, String tag) { if (result != null) return result; if (tailBuffer.remaining() <= allocator.getMaxAllocation()) { // The common case by far. - return wrapSmallBb(new LlapMetadataBuffer(key), tailBuffer); + return wrapSmallBb(new LlapMetadataBuffer(key, tag), tailBuffer); } else { int allocCount = determineAllocCount(tailBuffer); @SuppressWarnings("unchecked") LlapMetadataBuffer[] results = new LlapMetadataBuffer[allocCount]; for (int i = 0; i < allocCount; ++i) { - results[i] = new LlapMetadataBuffer(key); + results[i] = new LlapMetadataBuffer(key, tag); } wrapLargeBb(results, tailBuffer); return new LlapMetadataBuffers(results); @@ -470,9 +480,11 @@ public boolean equals(Object obj) { public final static class LlapMetadataBuffer extends LlapAllocatorBuffer implements LlapBufferOrBuffers { private final T key; + private String tag; - public LlapMetadataBuffer(T key) { + public LlapMetadataBuffer(T key, String tag) { this.key = key; + this.tag = tag; } @Override @@ -504,6 +516,11 @@ public LlapAllocatorBuffer getSingleLlapBuffer() { public LlapAllocatorBuffer[] getMultipleLlapBuffers() { return null; } + + @Override + public String getTag() { + return tag; + } } public final static class LlapMetadataBuffers implements LlapBufferOrBuffers { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java index fd8839a83a..2f7fa24558 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java @@ -121,4 +121,10 @@ public void notifyEvicted(EvictionDispatcher evictionDispatcher) { protected boolean isLocked() { return false; } + + @Override + public String getTag() { + // We don't care about these. + return "OrcEstimates"; + } } \ No newline at end of file diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java index 6a4b5986b7..2c87bc28e5 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java @@ -130,9 +130,9 @@ public void debugDumpShort(StringBuilder sb) { LlapDaemonCacheMetrics.create("test", "1"), new DummyCachePolicy(), new DummyAllocator(), true, -1); // no cleanup thread final int FILE = 1; - cache.putFileData(FILE, gaps(3756206, 4261729, 7294767, 7547564), fbs(3), 0, Priority.NORMAL, null); - cache.putFileData(FILE, gaps(7790545, 11051556), fbs(1), 0, Priority.NORMAL, null); - cache.putFileData(FILE, gaps(11864971, 11912961, 13350968, 13393630), fbs(3), 0, Priority.NORMAL, null); + cache.putFileData(FILE, gaps(3756206, 4261729, 7294767, 7547564), fbs(3), 0, Priority.NORMAL, null, null); + cache.putFileData(FILE, gaps(7790545, 11051556), fbs(1), 0, Priority.NORMAL, null, null); + cache.putFileData(FILE, gaps(11864971, 11912961, 13350968, 13393630), fbs(3), 0, Priority.NORMAL, null, null); DiskRangeList dr = dr(3756206, 7313562); MutateHelper mh = new MutateHelper(dr); dr = dr.insertAfter(dr(7790545, 11051556)); @@ -149,14 +149,14 @@ public void testGetPut() { long fn1 = 1, fn2 = 2; MemoryBuffer[] fakes = new MemoryBuffer[] { fb(), fb(), fb(), fb(), fb(), fb() }; verifyRefcount(fakes, 1, 1, 1, 1, 1, 1); - assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1), 0, Priority.NORMAL, null)); - assertNull(cache.putFileData(fn2, drs(1, 2), fbs(fakes, 2, 3), 0, Priority.NORMAL, null)); + assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1), 0, Priority.NORMAL, null, null)); + assertNull(cache.putFileData(fn2, drs(1, 2), fbs(fakes, 2, 3), 0, Priority.NORMAL, null, null)); verifyCacheGet(cache, fn1, 1, 3, fakes[0], fakes[1]); verifyCacheGet(cache, fn2, 1, 3, fakes[2], fakes[3]); verifyCacheGet(cache, fn1, 2, 4, fakes[1], dr(3, 4)); verifyRefcount(fakes, 3, 4, 3, 3, 1, 1); MemoryBuffer[] bufsDiff = fbs(fakes, 4, 5); - long[] mask = cache.putFileData(fn1, drs(3, 1), bufsDiff, 0, Priority.NORMAL, null); + long[] mask = cache.putFileData(fn1, drs(3, 1), bufsDiff, 0, Priority.NORMAL, null, null); assertEquals(1, mask.length); assertEquals(2, mask[0]); // 2nd bit set - element 2 was already in cache. assertSame(fakes[0], bufsDiff[1]); // Should have been replaced @@ -207,7 +207,7 @@ public void testMultiMatch() { long fn = 1; MemoryBuffer[] fakes = new MemoryBuffer[] { fb(), fb() }; assertNull(cache.putFileData( - fn, new DiskRange[] { dr(2, 4), dr(6, 8) }, fakes, 0, Priority.NORMAL, null)); + fn, new DiskRange[] { dr(2, 4), dr(6, 8) }, fakes, 0, Priority.NORMAL, null, null)); verifyCacheGet(cache, fn, 1, 9, dr(1, 2), fakes[0], dr(4, 6), fakes[1], dr(8, 9)); verifyCacheGet(cache, fn, 2, 8, fakes[0], dr(4, 6), fakes[1]); verifyCacheGet(cache, fn, 1, 5, dr(1, 2), fakes[0], dr(4, 5)); @@ -226,7 +226,7 @@ public void testMultiMatchNonGranular() { long fn = 1; MemoryBuffer[] fakes = new MemoryBuffer[] { fb(), fb() }; assertNull(cache.putFileData( - fn, new DiskRange[] { dr(2, 4), dr(6, 8) }, fakes, 0, Priority.NORMAL, null)); + fn, new DiskRange[] { dr(2, 4), dr(6, 8) }, fakes, 0, Priority.NORMAL, null, null)); // We expect cache requests from the middle here verifyCacheGet(cache, fn, 3, 4, fakes[0]); verifyCacheGet(cache, fn, 3, 7, fakes[0], dr(4, 6), fakes[1]); @@ -239,8 +239,8 @@ public void testStaleValueGet() { new DummyAllocator(), true, -1); // no cleanup thread long fn1 = 1, fn2 = 2; MemoryBuffer[] fakes = new MemoryBuffer[] { fb(), fb(), fb() }; - assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1), 0, Priority.NORMAL, null)); - assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 2), 0, Priority.NORMAL, null)); + assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1), 0, Priority.NORMAL, null, null)); + assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 2), 0, Priority.NORMAL, null, null)); verifyCacheGet(cache, fn1, 1, 3, fakes[0], fakes[1]); verifyCacheGet(cache, fn2, 1, 2, fakes[2]); verifyRefcount(fakes, 3, 3, 3); @@ -259,15 +259,15 @@ public void testStaleValueReplace() { long fn1 = 1, fn2 = 2; MemoryBuffer[] fakes = new MemoryBuffer[] { fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb() }; - assertNull(cache.putFileData(fn1, drs(1, 2, 3), fbs(fakes, 0, 1, 2), 0, Priority.NORMAL, null)); - assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 3), 0, Priority.NORMAL, null)); + assertNull(cache.putFileData(fn1, drs(1, 2, 3), fbs(fakes, 0, 1, 2), 0, Priority.NORMAL, null, null)); + assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 3), 0, Priority.NORMAL, null, null)); evict(cache, fakes[0]); evict(cache, fakes[3]); long[] mask = cache.putFileData( - fn1, drs(1, 2, 3, 4), fbs(fakes, 4, 5, 6, 7), 0, Priority.NORMAL, null); + fn1, drs(1, 2, 3, 4), fbs(fakes, 4, 5, 6, 7), 0, Priority.NORMAL, null, null); assertEquals(1, mask.length); assertEquals(6, mask[0]); // Buffers at offset 2 & 3 exist; 1 exists and is stale; 4 doesn't - assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 8), 0, Priority.NORMAL, null)); + assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 8), 0, Priority.NORMAL, null, null)); verifyCacheGet(cache, fn1, 1, 5, fakes[4], fakes[1], fakes[2], fakes[7]); } @@ -304,7 +304,7 @@ public void testCacheMetrics() { long fn = 1; MemoryBuffer[] fakes = new MemoryBuffer[]{fb(), fb(), fb()}; cache.putFileData(fn, new DiskRange[]{dr(0, 100), dr(300, 500), dr(800, 1000)}, - fakes, 0, Priority.NORMAL, null); + fakes, 0, Priority.NORMAL, null, null); assertEquals(0, metrics.getCacheRequestedBytes()); assertEquals(0, metrics.getCacheHitBytes()); list = new CreateHelper(); @@ -390,7 +390,7 @@ public Long call() { buf.setNewAllocLocation(makeFakeArenaIndex(fileIndex, offsets[j]), 0); buffers[j] = buf; } - long[] mask = cache.putFileData(fileName, ranges, buffers, 0, Priority.NORMAL, null); + long[] mask = cache.putFileData(fileName, ranges, buffers, 0, Priority.NORMAL, null, null); puts += buffers.length; long maskVal = 0; if (mask != null) { diff --git ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java index 62a174ad08..c75dd70b9d 100644 --- ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java +++ ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java @@ -61,10 +61,10 @@ new ConcurrentHashMap<>(); public static Path registerFile(DataCache cache, Path path, Object fileKey, - TreeMap index, Configuration conf) throws IOException { + TreeMap index, Configuration conf, String tag) throws IOException { long splitId = currentSplitId.incrementAndGet(); CacheAwareInputStream stream = new CacheAwareInputStream( - cache, conf, index, path, fileKey, -1); + cache, conf, index, path, fileKey, -1, tag); if (files.putIfAbsent(splitId, stream) != null) { throw new IOException("Record already exists for " + splitId); } @@ -166,23 +166,25 @@ public boolean rename(Path arg0, Path arg1) throws IOException { private final TreeMap chunkIndex; private final Path path; private final Object fileKey; + private final String tag; private final Configuration conf; private final DataCache cache; private final int bufferSize; private long position = 0; public CacheAwareInputStream(DataCache cache, Configuration conf, - TreeMap chunkIndex, Path path, Object fileKey, int bufferSize) { + TreeMap chunkIndex, Path path, Object fileKey, int bufferSize, String tag) { this.cache = cache; this.fileKey = fileKey; this.chunkIndex = chunkIndex; this.path = path; this.conf = conf; this.bufferSize = bufferSize; + this.tag = tag; } public LlapCacheAwareFs.CacheAwareInputStream cloneWithBufferSize(int bufferSize) { - return new CacheAwareInputStream(cache, conf, chunkIndex, path, fileKey, bufferSize); + return new CacheAwareInputStream(cache, conf, chunkIndex, path, fileKey, bufferSize, tag); } @Override @@ -307,7 +309,7 @@ public DiskRangeList createCacheChunk( } smallBuffer = null; } - cache.putFileData(fileKey, cacheRanges, newCacheData, 0); + cache.putFileData(fileKey, cacheRanges, newCacheData, 0, tag); } finally { // We do not use the new cache buffers for the actual read, given the way read() API is. // Therefore, we don't need to handle cache collisions - just decref all the buffers. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 462c62f52b..4e17394653 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -62,6 +62,8 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace.RangesSrc; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory; +import org.apache.hadoop.io.compress.zlib.ZlibDecompressor; +import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor; import org.apache.orc.OrcProto; import com.google.common.annotations.VisibleForTesting; @@ -130,7 +132,7 @@ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end private final Object fileKey; private final DataReader dataReader; private boolean isDataReaderOpen = false; - private final CompressionCodec codec; + private CompressionCodec codec; private final boolean isCodecFromPool; private boolean isCodecFailure = false; private final boolean isCompressed; @@ -143,11 +145,12 @@ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end private final IoTrace trace; private final TypeDescription fileSchema; private final WriterVersion version; + private final String tag; public EncodedReaderImpl(Object fileKey, List types, TypeDescription fileSchema, org.apache.orc.CompressionKind kind, WriterVersion version, int bufferSize, long strideRate, DataCache cacheWrapper, DataReader dataReader, - PoolFactory pf, IoTrace trace, boolean useCodecPool) throws IOException { + PoolFactory pf, IoTrace trace, boolean useCodecPool, String tag) throws IOException { this.fileKey = fileKey; this.compressionKind = kind; this.isCompressed = kind != org.apache.orc.CompressionKind.NONE; @@ -161,6 +164,7 @@ public EncodedReaderImpl(Object fileKey, List types, this.cacheWrapper = cacheWrapper; this.dataReader = dataReader; this.trace = trace; + this.tag = tag; if (POOLS != null) return; if (pf == null) { pf = new NoopPoolFactory(); @@ -686,6 +690,7 @@ public void close() throws IOException { } else { codec.close(); } + codec = null; } catch (Exception ex) { LOG.error("Ignoring error from codec", ex); } finally { @@ -849,7 +854,7 @@ public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, lon if (badEstimates != null && !badEstimates.isEmpty()) { // Relies on the fact that cache does not actually store these. DiskRange[] cacheKeys = badEstimates.toArray(new DiskRange[badEstimates.size()]); - long[] result = cacheWrapper.putFileData(fileKey, cacheKeys, null, baseOffset); + long[] result = cacheWrapper.putFileData(fileKey, cacheKeys, null, baseOffset, tag); assert result == null; // We don't expect conflicts from bad estimates. } @@ -909,7 +914,7 @@ public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, lon // 6. Finally, put uncompressed data to cache. if (fileKey != null) { long[] collisionMask = cacheWrapper.putFileData( - fileKey, cacheKeys, targetBuffers, baseOffset); + fileKey, cacheKeys, targetBuffers, baseOffset, tag); processCacheCollisions(collisionMask, toDecompress, targetBuffers, csd.getCacheBuffers()); } @@ -1163,7 +1168,8 @@ private DiskRangeList preReadUncompressedStream(long baseOffset, DiskRangeList s // 5. Put uncompressed data to cache. if (fileKey != null) { - long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset); + long[] collisionMask = cacheWrapper.putFileData( + fileKey, cacheKeys, targetBuffers, baseOffset, tag); processCacheCollisions(collisionMask, toCache, targetBuffers, null); } @@ -1261,6 +1267,7 @@ private static void decompressChunk( } codec.reset(); // We always need to call reset on the codec. codec.decompress(src, dest); + dest.position(startPos); int newLim = dest.limit(); if (newLim > startLim) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java index 57fb63b463..210c987b7f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java @@ -46,7 +46,7 @@ * @return The reader. */ EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader, - PoolFactory pf, IoTrace trace, boolean useCodecPool) throws IOException; + PoolFactory pf, IoTrace trace, boolean useCodecPool, String tag) throws IOException; /** The factory that can create (or return) the pools used by encoded reader. */ public interface PoolFactory { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java index 49cd9bac2d..a9a9f10194 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java @@ -35,8 +35,8 @@ public ReaderImpl(Path path, ReaderOptions options) throws IOException { @Override public EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader, - PoolFactory pf, IoTrace trace, boolean useCodecPool) throws IOException { + PoolFactory pf, IoTrace trace, boolean useCodecPool, String tag) throws IOException { return new EncodedReaderImpl(fileKey, types, getSchema(), compressionKind, getWriterVersion(), - bufferSize, rowIndexStride, dataCache, dataReader, pf, trace, useCodecPool); + bufferSize, rowIndexStride, dataCache, dataReader, pf, trace, useCodecPool, tag); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index 7b77eeeeac..f64efe26f5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -25,10 +25,10 @@ import org.apache.hadoop.hive.common.io.DataCache; import org.apache.hadoop.hive.common.io.FileMetadataCache; import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers; - import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.LlapCacheAwareFs; +import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; @@ -188,6 +188,7 @@ public void initialize( // if task.side.metadata is set, rowGroupOffsets is null Object cacheKey = null; + String cacheTag = null; // TODO: also support fileKey in splits, like OrcSplit does if (metadataCache != null) { cacheKey = HdfsUtils.getFileId(file.getFileSystem(configuration), file, @@ -195,6 +196,9 @@ public void initialize( HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID)); } if (cacheKey != null) { + if (HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_TRACK_CACHE_USAGE)) { + cacheTag = LlapUtil.getDbAndTableNameForMetrics(file, true); + } // If we are going to use cache, change the path to depend on file ID for extra consistency. FileSystem fs = file.getFileSystem(configuration); if (cacheKey instanceof Long && HiveConf.getBoolVar( @@ -207,13 +211,13 @@ public void initialize( //TODO check whether rowGroupOffSets can be null // then we need to apply the predicate push down filter footer = readSplitFooter( - configuration, file, cacheKey, range(split.getStart(), split.getEnd())); + configuration, file, cacheKey, range(split.getStart(), split.getEnd()), cacheTag); MessageType fileSchema = footer.getFileMetaData().getSchema(); FilterCompat.Filter filter = getFilter(configuration); blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema); } else { // otherwise we find the row groups that were selected on the client - footer = readSplitFooter(configuration, file, cacheKey, NO_FILTER); + footer = readSplitFooter(configuration, file, cacheKey, NO_FILTER, cacheTag); Set offsets = new HashSet<>(); for (long offset : rowGroupOffsets) { offsets.add(offset); @@ -250,13 +254,13 @@ public void initialize( requestedSchema = DataWritableReadSupport .getRequestedSchema(indexAccess, columnNamesList, columnTypesList, fileSchema, configuration); - Path path = wrapPathForCache(file, cacheKey, configuration, blocks); + Path path = wrapPathForCache(file, cacheKey, configuration, blocks, cacheTag); this.reader = new ParquetFileReader( configuration, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns()); } private Path wrapPathForCache(Path path, Object fileKey, JobConf configuration, - List blocks) throws IOException { + List blocks, String tag) throws IOException { if (fileKey == null || cache == null) { return path; } @@ -277,13 +281,13 @@ private Path wrapPathForCache(Path path, Object fileKey, JobConf configuration, // Register the cache-aware path so that Parquet reader would go thru it. configuration.set("fs." + LlapCacheAwareFs.SCHEME + ".impl", LlapCacheAwareFs.class.getCanonicalName()); - path = LlapCacheAwareFs.registerFile(cache, path, fileKey, chunkIndex, configuration); + path = LlapCacheAwareFs.registerFile(cache, path, fileKey, chunkIndex, configuration, tag); this.cacheFsPath = path; return path; } - private ParquetMetadata readSplitFooter( - JobConf configuration, final Path file, Object cacheKey, MetadataFilter filter) throws IOException { + private ParquetMetadata readSplitFooter(JobConf configuration, final Path file, + Object cacheKey, MetadataFilter filter, String tag) throws IOException { MemoryBufferOrBuffers footerData = (cacheKey == null || metadataCache == null) ? null : metadataCache.getFileMetadata(cacheKey); if (footerData != null) { @@ -313,7 +317,7 @@ private ParquetMetadata readSplitFooter( if (LOG.isInfoEnabled()) { LOG.info("Caching the footer of length " + footerLength + " for " + cacheKey); } - footerData = metadataCache.putFileMetadata(cacheKey, footerLength, stream); + footerData = metadataCache.putFileMetadata(cacheKey, footerLength, stream, tag); try { return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter); } finally { diff --git storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java index 795739bfe8..2ac0a18a50 100644 --- storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java +++ storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java @@ -60,25 +60,7 @@ DiskRangeList getFileData(Object fileKey, DiskRangeList range, long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData); - /** - * Puts file data into cache, or gets older data in case of collisions. - * - * The memory buffers provided MUST be allocated via an allocator returned by getAllocator - * method, to allow cache implementations that evict and then de-allocate the buffer. - * - * It is assumed that the caller will use the data immediately, therefore any buffers provided - * to putFileData (or returned due to cache collision) are locked in cache to prevent eviction, - * and must therefore be released back to cache via a corresponding call (releaseBuffer) when the - * caller is done with it. Buffers rejected due to conflict will neither be locked, nor - * automatically deallocated. The caller must take care to discard these buffers. - * - * @param fileKey Unique ID of the target file on the file system. - * @param ranges The ranges for which the data is being cached. These objects will not be stored. - * @param data The data for the corresponding ranges. - * @param baseOffset base offset for the ranges (stripe/stream offset in case of ORC). - * @return null if all data was put; bitmask indicating which chunks were not put otherwise; - * the replacement chunks from cache are updated directly in the array. - */ + @Deprecated long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] data, long baseOffset); /** @@ -106,4 +88,27 @@ DiskRangeList getFileData(Object fileKey, DiskRangeList range, long baseOffset, * @return the factory */ Allocator.BufferObjectFactory getDataBufferFactory(); + + + /** + * Puts file data into cache, or gets older data in case of collisions. + * + * The memory buffers provided MUST be allocated via an allocator returned by getAllocator + * method, to allow cache implementations that evict and then de-allocate the buffer. + * + * It is assumed that the caller will use the data immediately, therefore any buffers provided + * to putFileData (or returned due to cache collision) are locked in cache to prevent eviction, + * and must therefore be released back to cache via a corresponding call (releaseBuffer) when the + * caller is done with it. Buffers rejected due to conflict will neither be locked, nor + * automatically deallocated. The caller must take care to discard these buffers. + * + * @param fileKey Unique ID of the target file on the file system. + * @param ranges The ranges for which the data is being cached. These objects will not be stored. + * @param data The data for the corresponding ranges. + * @param baseOffset base offset for the ranges (stripe/stream offset in case of ORC). + * @return null if all data was put; bitmask indicating which chunks were not put otherwise; + * the replacement chunks from cache are updated directly in the array. + */ + long[] putFileData(Object fileKey, DiskRange[] ranges, + MemoryBuffer[] data, long baseOffset, String tag); } diff --git storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java index b417bd36de..d1da7f5de8 100644 --- storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java +++ storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java @@ -32,17 +32,11 @@ */ MemoryBufferOrBuffers getFileMetadata(Object fileKey); - /** - * Puts the metadata for a given file (e.g. a footer buffer into cache). - * @param fileKey The file key. - * @param length The footer length. - * @param is The stream to read the footer from. - * @return The buffer or buffers representing the cached footer. - * The caller must decref this buffer when done. - */ + @Deprecated MemoryBufferOrBuffers putFileMetadata( Object fileKey, int length, InputStream is) throws IOException; + @Deprecated MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer); /** @@ -50,4 +44,18 @@ MemoryBufferOrBuffers putFileMetadata( * @param buffer The buffer to release. */ void decRefBuffer(MemoryBufferOrBuffers buffer); + + + /** + * Puts the metadata for a given file (e.g. a footer buffer into cache). + * @param fileKey The file key. + * @param length The footer length. + * @param is The stream to read the footer from. + * @return The buffer or buffers representing the cached footer. + * The caller must decref this buffer when done. + */ + MemoryBufferOrBuffers putFileMetadata( + Object fileKey, int length, InputStream is, String tag) throws IOException; + + MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, String tag); } \ No newline at end of file