diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java index a351a193c6bc558bb420049c54b7657cd7d04b7c..c26ab629e98bd11a82e7746c486ae719a2cd3c5c 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java @@ -314,8 +314,7 @@ public static String humanReadableByteCount(long bytes) { DELETE_DELTA_PREFIX = "delete_delta_", BUCKET_PREFIX = "bucket_", DATABASE_PATH_SUFFIX = ".db", UNION_SUDBIR_PREFIX = "HIVE_UNION_SUBDIR_"; - public static final char DERIVED_ENTITY_PARTITION_SEPARATOR = '/'; - + @Deprecated public static String getDbAndTableNameForMetrics(Path path, boolean includeParts) { String[] parts = path.toUri().getPath().toString().split(Path.SEPARATOR); int dbIx = -1; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java index 64c0125833af100fd7012b9751d075ab536ad1b0..733b30c7dda5ca051fb7950b3944620e026e0bc4 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java @@ -18,19 +18,24 @@ package org.apache.hadoop.hive.llap.cache; +import java.util.ArrayList; import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.ConcurrentSkipListMap; -import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.common.io.CacheTag; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; +import static java.util.stream.Collectors.joining; + /** * A wrapper around cache eviction policy that tracks cache contents via tags. */ public class CacheContentsTracker implements LowLevelCachePolicy, EvictionListener { private static final long CLEANUP_TIME_MS = 3600 * 1000L, MIN_TIME_MS = 300 * 1000L; - private final ConcurrentSkipListMap tagInfo = new ConcurrentSkipListMap<>(); + private final ConcurrentSkipListMap tagInfo = new ConcurrentSkipListMap<>(); private EvictionListener evictionListener; private LowLevelCachePolicy realPolicy; private final Thread cleanupThread; @@ -75,56 +80,37 @@ public void run() { } private static class TagState { - public TagState(String name) { - this.name = name; + TagState(CacheTag cacheTag) { + this.cacheTag = cacheTag; } - public final String name; + public final CacheTag cacheTag; public long emptyTimeNs; public long bufferCount, totalSize, maxCount, maxSize; - public boolean isRemoved = false; } - private void reportCached(LlapCacheableBuffer buffer) { long size = buffer.getMemoryUsage(); - TagState state; - do { - state = getTagState(buffer); - } while (!reportCached(state, size)); - state = null; - do { - state = getParentTagState(buffer); - if (state == null) break; - } while (!reportCached(state, size)); - } - - private boolean reportCached(TagState state, long size) { + TagState state = getTagState(buffer); + reportCached(state, size); + } + + private void reportCached(TagState state, long size) { synchronized (state) { - if (state.isRemoved) return false; ++state.bufferCount; state.totalSize += size; state.maxSize = Math.max(state.maxSize, state.totalSize); state.maxCount = Math.max(state.maxCount, state.bufferCount); } - return true; } private void reportRemoved(LlapCacheableBuffer buffer) { long size = buffer.getMemoryUsage(); - TagState state; - do { - state = getTagState(buffer); - } while (!reportRemoved(state, size)); - state = null; - do { - state = getParentTagState(buffer); - if (state == null) break; - } while (!reportRemoved(state, size)); - } - - private boolean reportRemoved(TagState state, long size) { + TagState state = getTagState(buffer); + reportRemoved(state, size); + } + + private void reportRemoved(TagState state, long size) { synchronized (state) { - if (state.isRemoved) return false; --state.bufferCount; assert state.bufferCount >= 0; state.totalSize -= size; @@ -132,21 +118,13 @@ private boolean reportRemoved(TagState state, long size) { state.emptyTimeNs = System.nanoTime(); } } - return true; } private TagState getTagState(LlapCacheableBuffer buffer) { return getTagState(buffer.getTag()); } - private TagState getParentTagState(LlapCacheableBuffer buffer) { - String tag = buffer.getTag(); - int ix = tag.indexOf(LlapUtil.DERIVED_ENTITY_PARTITION_SEPARATOR); - if (ix <= 0) return null; - return getTagState(tag.substring(0, ix)); - } - - private TagState getTagState(String tag) { + private TagState getTagState(CacheTag tag) { TagState state = tagInfo.get(tag); if (state == null) { state = new TagState(tag); @@ -191,14 +169,51 @@ public long evictSomeBlocks(long memoryToReserve) { @Override public void debugDumpShort(StringBuilder sb) { - sb.append("\nCache state: "); + ArrayList endResult = new ArrayList<>(); + Map summaries = new TreeMap<>(); + for (TagState state : tagInfo.values()) { synchronized (state) { - sb.append("\n").append(state.name).append(": ").append(state.bufferCount).append("/") - .append(state.maxCount).append(", ").append(state.totalSize).append("/") - .append(state.maxSize); + endResult.add(unsafePrintTagState(state)); + + // Handle summary calculation + CacheTag parentTag = CacheTag.createParentCacheTag(state.cacheTag); + while (parentTag != null) { + if (!summaries.containsKey(parentTag)) { + summaries.put(parentTag, new TagState(parentTag)); + } + TagState parentState = summaries.get(parentTag); + parentState.bufferCount += state.bufferCount; + parentState.maxCount += state.maxCount; + parentState.totalSize += state.totalSize; + parentState.maxSize += state.maxSize; + parentTag = CacheTag.createParentCacheTag(parentTag); + } } } + for (TagState state : summaries.values()) { + endResult.add(unsafePrintTagState(state)); + } + sb.append("\nCache state: \n"); + sb.append(endResult.stream().sorted().collect(joining("\n"))); + } + + /** + * Constructs a String by pretty printing a TagState instance - for Web UI consumption. + * Note: does not lock on TagState instance. + * @param state + * @return + */ + private String unsafePrintTagState(TagState state) { + StringBuilder sb = new StringBuilder(); + sb.append(state.cacheTag.getTableName()); + if (state.cacheTag instanceof CacheTag.PartitionCacheTag) { + sb.append("/").append(String.join("/", + ((CacheTag.PartitionCacheTag) state.cacheTag).partitionDescToString())); + } + sb.append(" : ").append(state.bufferCount).append("/").append(state.maxCount).append(", ") + .append(state.totalSize).append("/").append(state.maxSize); + return sb.toString(); } @Override @@ -206,4 +221,5 @@ public void notifyEvicted(LlapCacheableBuffer buffer) { evictionListener.notifyEvicted(buffer); reportRemoved(buffer); } + } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java index f91a5d91a5b739dcbee98a1485ad4c59f6a9057b..05260332d50ce3ae1a8e21da4bab3086286e9b7c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.llap.cache; +import org.apache.hadoop.hive.common.io.CacheTag; + /** * Buffer that can be managed by LowLevelEvictionPolicy. * We want to have cacheable and non-allocator buffers, as well as allocator buffers with no @@ -56,7 +58,7 @@ public String toStringForCache() { + lastUpdate + " " + (isLocked() ? "!" : ".") + "]"; } - public abstract String getTag(); + public abstract CacheTag getTag(); protected abstract boolean isLocked(); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java index 405fca2d4fae9fe0e3fd6d6d1345d55255d6df78..3d5e08e0309a2d0e1f533c53409be4e0913c160d 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java @@ -18,25 +18,27 @@ package org.apache.hadoop.hive.llap.cache; +import org.apache.hadoop.hive.common.io.CacheTag; + public final class LlapDataBuffer extends LlapAllocatorBuffer { public static final int UNKNOWN_CACHED_LENGTH = -1; /** ORC cache uses this to store compressed length; buffer is cached uncompressed, but * the lookup is on compressed ranges, so we need to know this. */ public int declaredCachedLength = UNKNOWN_CACHED_LENGTH; - private String tag; + private CacheTag tag; @Override public void notifyEvicted(EvictionDispatcher evictionDispatcher) { evictionDispatcher.notifyEvicted(this); } - public void setTag(String tag) { + public void setTag(CacheTag tag) { this.tag = tag; } @Override - public String getTag() { + public CacheTag getTag() { return tag; } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java index 4dd3826a67dfff66ce9c90027d61a9012c0a15e8..f780b29b346919fb7a4b7e28c15b596c575b55ba 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.llap.cache; +import org.apache.hadoop.hive.common.io.CacheTag; import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.hadoop.hive.common.io.DiskRangeList; import org.apache.hadoop.hive.common.io.DataCache.BooleanRef; @@ -58,7 +59,7 @@ DiskRangeList getFileData(Object fileKey, DiskRangeList range, long baseOffset, * the replacement chunks from cache are updated directly in the array. */ long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] chunks, - long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, String tag); + long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, CacheTag tag); /** Notifies the cache that a particular buffer should be removed due to eviction. */ void notifyEvicted(MemoryBuffer buffer); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java index 62d7e5534486b53634de332875c5fd5d336c29b4..5d0f2aba10f229df75b514c0bd42386ccf47f9bd 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hive.llap.cache; -import org.apache.orc.impl.RecordReaderUtils; - import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; @@ -29,19 +27,20 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hive.common.io.Allocator; -import org.apache.hadoop.hive.common.io.DiskRange; -import org.apache.hadoop.hive.common.io.DiskRangeList; +import org.apache.hadoop.hive.common.io.CacheTag; import org.apache.hadoop.hive.common.io.DataCache.BooleanRef; import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory; +import org.apache.hadoop.hive.common.io.DiskRange; +import org.apache.hadoop.hive.common.io.DiskRangeList; import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper; import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; import org.apache.hive.common.util.Ref; +import org.apache.orc.impl.RecordReaderUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; -import com.google.common.base.Joiner; public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, LlapIoDebugDump { private static final int DEFAULT_CLEANUP_INTERVAL = 600; @@ -290,7 +289,7 @@ private boolean lockBuffer(LlapDataBuffer buffer, boolean doNotifyPolicy) { @Override public long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] buffers, - long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, String tag) { + long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, CacheTag tag) { long[] result = null; assert buffers.length == ranges.length; FileCache> subCache = diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java index 2a39d2d32807a51346baad28b04d87670381b6d5..7930fd9ff5c0dc5fd1464d999c18097e3629a77c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.common.io.DataCache.BooleanRef; import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory; import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; +import org.apache.hadoop.hive.common.io.CacheTag; import org.apache.hadoop.hive.llap.DebugUtils; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; @@ -75,16 +76,16 @@ public static final class LlapSerDeDataBuffer extends LlapAllocatorBuffer { public boolean isCached = false; - private String tag; + private CacheTag tag; @Override public void notifyEvicted(EvictionDispatcher evictionDispatcher) { evictionDispatcher.notifyEvicted(this); } - public void setTag(String tag) { + public void setTag(CacheTag tag) { this.tag = tag; } @Override - public String getTag() { + public CacheTag getTag() { return tag; } } @@ -523,7 +524,7 @@ private boolean lockBuffer(LlapSerDeDataBuffer buffer, boolean doNotifyPolicy) { } public void putFileData(final FileData data, Priority priority, - LowLevelCacheCounters qfCounters, String tag) { + LowLevelCacheCounters qfCounters, CacheTag tag) { // TODO: buffers are accounted for at allocation time, but ideally we should report the memory // overhead from the java objects to memory manager and remove it when discarding file. if (data.stripes == null || data.stripes.isEmpty()) { @@ -598,7 +599,7 @@ public FileData apply(Void input) { } } - private void lockAllBuffersForPut(StripeData si, Priority priority, String tag) { + private void lockAllBuffersForPut(StripeData si, Priority priority, CacheTag tag) { for (int i = 0; i < si.data.length; ++i) { LlapSerDeDataBuffer[][] colData = si.data[i]; if (colData == null) continue; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java index 41855e171eaa5bf8da638bc62bce3d0d49dc4bae..b2f606a6aac143e1f4a2b181f3cd71df7e36f0ff 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java @@ -20,6 +20,7 @@ import java.util.List; import org.apache.hadoop.hive.common.io.Allocator; +import org.apache.hadoop.hive.common.io.CacheTag; import org.apache.hadoop.hive.common.io.DataCache.BooleanRef; import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory; import org.apache.hadoop.hive.common.io.DiskRange; @@ -85,7 +86,7 @@ public DiskRangeList getFileData(Object fileKey, DiskRangeList range, long baseO @Override public long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] chunks, - long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, String tag) { + long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, CacheTag tag) { for (int i = 0; i < chunks.length; ++i) { LlapAllocatorBuffer buffer = (LlapAllocatorBuffer)chunks[i]; if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) { diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index c63ee5f79b4f9fc356f033960e0af1a7b0058038..0d9077c3688fbe0c9939dfd308ca6bee946aef87 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -28,6 +28,7 @@ import javax.management.ObjectName; +import org.apache.hadoop.hive.common.io.CacheTag; import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -299,7 +300,7 @@ public DiskRangeList getFileData(Object fileKey, DiskRangeList range, @Override public long[] putFileData(Object fileKey, DiskRange[] ranges, - MemoryBuffer[] data, long baseOffset, String tag) { + MemoryBuffer[] data, long baseOffset, CacheTag tag) { return lowLevelCache.putFileData( fileKey, ranges, data, baseOffset, Priority.NORMAL, null, tag); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index 1378a01f44ef774a15f769460833064c6305b2d6..8cc81ccbffed05db5bff34942a2083d61687cc6a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -27,11 +27,11 @@ import java.util.concurrent.atomic.AtomicReference; import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.ConsumerFeedback; +import org.apache.hadoop.hive.llap.LlapHiveUtils; import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; @@ -41,8 +41,6 @@ import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.SchemaEvolutionFactory; import org.apache.hadoop.hive.llap.io.decode.ReadPipeline; import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.tez.DagUtils; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -55,7 +53,6 @@ import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; @@ -75,8 +72,6 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import com.google.common.collect.Lists; - class LlapRecordReader implements RecordReader, Consumer { private static final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class); @@ -116,7 +111,7 @@ public static LlapRecordReader create(JobConf job, FileSplit split, List tableIncludedCols, String hostName, ColumnVectorProducer cvp, ExecutorService executor, InputFormat sourceInputFormat, Deserializer sourceSerDe, Reporter reporter, Configuration daemonConf) throws IOException, HiveException { - MapWork mapWork = findMapWork(job); + MapWork mapWork = LlapHiveUtils.findMapWork(job); if (mapWork == null) return null; // No compatible MapWork. LlapRecordReader rr = new LlapRecordReader(mapWork, job, split, tableIncludedCols, hostName, cvp, executor, sourceInputFormat, sourceSerDe, reporter, daemonConf); @@ -302,37 +297,6 @@ static int determineQueueLimit(long maxBufferedSize, return Math.max(bestEffortSize, queueLimitMin); } - - private static MapWork findMapWork(JobConf job) throws HiveException { - String inputName = job.get(Utilities.INPUT_NAME, null); - if (LOG.isDebugEnabled()) { - LOG.debug("Initializing for input " + inputName); - } - String prefixes = job.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); - if (prefixes != null && !StringUtils.isBlank(prefixes)) { - // Currently SMB is broken, so we cannot check if it's compatible with IO elevator. - // So, we don't use the below code that would get the correct MapWork. See HIVE-16985. - return null; - } - - BaseWork work = null; - // HIVE-16985: try to find the fake merge work for SMB join, that is really another MapWork. - if (inputName != null) { - if (prefixes == null || - !Lists.newArrayList(prefixes.split(",")).contains(inputName)) { - inputName = null; - } - } - if (inputName != null) { - work = Utilities.getMergeWork(job, inputName); - } - - if (!(work instanceof MapWork)) { - work = Utilities.getMapWork(job); - } - return (MapWork) work; - } - /** * Starts the data read pipeline */ diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java index 2a0c5ca92f3c7431f3c399f309a538f47eb27597..17c4821ec67b0bc5b52ac8460f56c1f61ee0a2e7 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java @@ -88,12 +88,12 @@ public ReadPipeline createReadPipeline( Consumer consumer, FileSplit split, Includes includes, SearchArgument sarg, QueryFragmentCounters counters, SchemaEvolutionFactory sef, InputFormat unused0, Deserializer unused1, Reporter reporter, JobConf job, - Map unused2) throws IOException { + Map parts) throws IOException { cacheMetrics.incrCacheReadRequests(); OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer( consumer, includes, _skipCorrupt, counters, ioMetrics); OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager, - metadataCache, conf, job, split, includes, sarg, edc, counters, sef, tracePool); + metadataCache, conf, job, split, includes, sarg, edc, counters, sef, tracePool, parts); edc.init(reader, reader, reader.getTrace()); return edc; } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 85a42f945624c3ca468790772f52363b4064d8fc..3fcf0dc064f26d5e85a3a035e4e0b3b29638768c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; @@ -32,6 +33,7 @@ import org.apache.hadoop.hive.common.Pool.PoolObjectHelper; import org.apache.hadoop.hive.common.io.Allocator; import org.apache.hadoop.hive.common.io.Allocator.BufferObjectFactory; +import org.apache.hadoop.hive.common.io.CacheTag; import org.apache.hadoop.hive.common.io.DataCache; import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.hadoop.hive.common.io.DiskRangeList; @@ -41,7 +43,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.DebugUtils; -import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.llap.LlapHiveUtils; import org.apache.hadoop.hive.llap.cache.BufferUsageManager; import org.apache.hadoop.hive.llap.cache.LlapDataBuffer; import org.apache.hadoop.hive.llap.cache.LowLevelCache; @@ -70,6 +72,7 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.common.util.FixedSizedObjectPool; @@ -167,7 +170,8 @@ public void resetBeforeOffer(OrcEncodedColumnBatch t) { private EncodedReader stripeReader; private CompressionCodec codec; private Object fileKey; - private final String cacheTag; + private final CacheTag cacheTag; + private final Map parts; private Utilities.SupplierWithCheckedException fsSupplier; @@ -187,7 +191,8 @@ public void resetBeforeOffer(OrcEncodedColumnBatch t) { public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager bufferManager, MetadataCache metadataCache, Configuration daemonConf, Configuration jobConf, FileSplit split, Includes includes, SearchArgument sarg, OrcEncodedDataConsumer consumer, - QueryFragmentCounters counters, SchemaEvolutionFactory sef, Pool tracePool) + QueryFragmentCounters counters, SchemaEvolutionFactory sef, Pool tracePool, + Map parts) throws IOException { this.lowLevelCache = lowLevelCache; this.metadataCache = metadataCache; @@ -199,6 +204,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff this.counters = counters; this.trace = tracePool.take(); this.tracePool = tracePool; + this.parts = parts; try { this.ugi = UserGroupInformation.getCurrentUser(); } catch (IOException e) { @@ -210,7 +216,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff // LlapInputFormat needs to know the file schema to decide if schema evolution is supported. orcReader = null; cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE) - ? LlapUtil.getDbAndTableNameForMetrics(split.getPath(), true) : null; + ? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, parts) : null; // 1. Get file metadata from cache, or create the reader and read it. // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that fsSupplier = Utilities.getFsSupplier(split.getPath(), jobConf); @@ -278,7 +284,7 @@ protected Void performDataRead() throws IOException, InterruptedException { return null; } counters.setDesc(QueryFragmentCounters.Desc.TABLE, - LlapUtil.getDbAndTableNameForMetrics(split.getPath(), false)); + LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), false, parts)); counters.setDesc(QueryFragmentCounters.Desc.FILE, split.getPath() + (fileKey == null ? "" : " (" + fileKey + ")")); try { @@ -927,7 +933,7 @@ public DiskRangeList getFileData(Object fileKey, DiskRangeList range, @Override public long[] putFileData(Object fileKey, DiskRange[] ranges, - MemoryBuffer[] data, long baseOffset, String tag) { + MemoryBuffer[] data, long baseOffset, CacheTag tag) { if (data != null) { return lowLevelCache.putFileData( fileKey, ranges, data, baseOffset, Priority.NORMAL, counters, tag); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java index d414b1405b7672767196b3eaad02baa516169288..c73ba2c6e9e4cf234017b60086b3a11fdf3a2f36 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hive.llap.io.encoded; -import org.apache.orc.impl.MemoryManager; - import java.io.IOException; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; @@ -37,16 +35,17 @@ import org.apache.hadoop.hive.common.Pool.PoolObjectHelper; import org.apache.hadoop.hive.common.io.Allocator; import org.apache.hadoop.hive.common.io.Allocator.BufferObjectFactory; +import org.apache.hadoop.hive.common.io.CacheTag; import org.apache.hadoop.hive.common.io.DataCache.BooleanRef; -import org.apache.hadoop.hive.common.io.DiskRangeList; import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory; +import org.apache.hadoop.hive.common.io.DiskRangeList; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData; import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.DebugUtils; -import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.llap.LlapHiveUtils; import org.apache.hadoop.hive.llap.cache.BufferUsageManager; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl; @@ -89,14 +88,15 @@ import org.apache.orc.CompressionCodec; import org.apache.orc.CompressionKind; import org.apache.orc.OrcConf; -import org.apache.orc.OrcUtils; import org.apache.orc.OrcFile.EncodingStrategy; import org.apache.orc.OrcFile.Version; import org.apache.orc.OrcProto; import org.apache.orc.OrcProto.ColumnEncoding; -import org.apache.orc.TypeDescription; +import org.apache.orc.OrcUtils; import org.apache.orc.PhysicalWriter; import org.apache.orc.PhysicalWriter.OutputReceiver; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.MemoryManager; import org.apache.orc.impl.SchemaEvolution; import org.apache.orc.impl.StreamName; import org.apache.tez.common.CallableWithNdc; @@ -149,7 +149,7 @@ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end private final Map parts; private final Object fileKey; - private final String cacheTag; + private final CacheTag cacheTag; private final FileSystem fs; private AtomicBoolean isStopped = new AtomicBoolean(false); @@ -219,7 +219,7 @@ public MemoryBuffer create() { HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID), !HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH)); cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE) - ? LlapUtil.getDbAndTableNameForMetrics(split.getPath(), true) : null; + ? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, parts) : null; this.sourceInputFormat = sourceInputFormat; this.sourceSerDe = sourceSerDe; this.reporter = reporter; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java index 8400fe98411ed07bd525a51a223fc35423136efb..10bd7364868d32ca886bafb8e450e810322d3546 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java @@ -34,8 +34,8 @@ import org.apache.hadoop.hive.common.io.DiskRangeList; import org.apache.hadoop.hive.common.io.DataCache.BooleanRef; import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; +import org.apache.hadoop.hive.common.io.CacheTag; import org.apache.hadoop.hive.llap.cache.BuddyAllocator; -import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator; import org.apache.hadoop.hive.llap.cache.EvictionDispatcher; import org.apache.hadoop.hive.llap.cache.LlapAllocatorBuffer; import org.apache.hadoop.hive.llap.cache.LlapIoDebugDump; @@ -45,7 +45,6 @@ import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey; -import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator; public class MetadataCache implements LlapIoDebugDump, FileMetadataCache { private final ConcurrentHashMap metadata = @@ -173,7 +172,7 @@ public MemoryBufferOrBuffers putFileMetadata(Object fileKey, @Override public MemoryBufferOrBuffers putFileMetadata(Object fileKey, - ByteBuffer tailBuffer, String tag) { + ByteBuffer tailBuffer, CacheTag tag) { return putInternal(fileKey, tailBuffer, tag, null); } @@ -184,26 +183,26 @@ public MemoryBufferOrBuffers putFileMetadata(Object fileKey, int length, } public LlapBufferOrBuffers putStripeTail( - OrcBatchKey stripeKey, ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) { + OrcBatchKey stripeKey, ByteBuffer tailBuffer, CacheTag tag, AtomicBoolean isStopped) { return putInternal(new StripeKey(stripeKey.fileKey, stripeKey.stripeIx), tailBuffer, tag, isStopped); } @Override public MemoryBufferOrBuffers putFileMetadata(Object fileKey, int length, - InputStream is, String tag) throws IOException { + InputStream is, CacheTag tag) throws IOException { return putFileMetadata(fileKey, length, is, tag, null); } @Override public LlapBufferOrBuffers putFileMetadata(Object fileKey, - ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) { + ByteBuffer tailBuffer, CacheTag tag, AtomicBoolean isStopped) { return putInternal(fileKey, tailBuffer, tag, isStopped); } @Override public LlapBufferOrBuffers putFileMetadata(Object fileKey, int length, InputStream is, - String tag, AtomicBoolean isStopped) throws IOException { + CacheTag tag, AtomicBoolean isStopped) throws IOException { LlapBufferOrBuffers result = null; while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value). LlapBufferOrBuffers oldVal = metadata.get(fileKey); @@ -229,7 +228,7 @@ public LlapBufferOrBuffers putFileMetadata(Object fileKey, int length, InputStre @SuppressWarnings({ "rawtypes", "unchecked" }) private LlapBufferOrBuffers wrapBbForFile(LlapBufferOrBuffers result, - Object fileKey, int length, InputStream stream, String tag, AtomicBoolean isStopped) throws IOException { + Object fileKey, int length, InputStream stream, CacheTag tag, AtomicBoolean isStopped) throws IOException { if (result != null) return result; int maxAlloc = allocator.getMaxAllocation(); LlapMetadataBuffer[] largeBuffers = null; @@ -274,7 +273,7 @@ private static void readIntoCacheBuffer( bb.position(pos); } - private LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) { + private LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer, CacheTag tag, AtomicBoolean isStopped) { LlapBufferOrBuffers result = null; while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value). LlapBufferOrBuffers oldVal = metadata.get(key); @@ -337,7 +336,7 @@ public void decRefBuffer(MemoryBufferOrBuffers buffer) { } private LlapBufferOrBuffers wrapBb( - LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) { + LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer, CacheTag tag, AtomicBoolean isStopped) { if (result != null) return result; if (tailBuffer.remaining() <= allocator.getMaxAllocation()) { // The common case by far. @@ -507,9 +506,9 @@ public boolean equals(Object obj) { public final static class LlapMetadataBuffer extends LlapAllocatorBuffer implements LlapBufferOrBuffers { private final T key; - private String tag; + private CacheTag tag; - public LlapMetadataBuffer(T key, String tag) { + public LlapMetadataBuffer(T key, CacheTag tag) { this.key = key; this.tag = tag; } @@ -545,7 +544,7 @@ public LlapAllocatorBuffer getSingleLlapBuffer() { } @Override - public String getTag() { + public CacheTag getTag() { return tag; } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java index 30dc1b9da2002689b8b1917f46ae3ca24194f3be..33e16802df2a3ba46a4817a381ef120afb1e5969 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.common.io.DiskRangeList; import org.apache.hadoop.hive.common.io.DataCache.BooleanRef; import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper; +import org.apache.hadoop.hive.common.io.CacheTag; import org.apache.hadoop.hive.llap.cache.EvictionDispatcher; import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer; import org.apache.hadoop.hive.ql.io.SyntheticFileId; @@ -123,8 +124,8 @@ protected boolean isLocked() { } @Override - public String getTag() { + public CacheTag getTag() { // We don't care about these. - return "OrcEstimates"; + return CacheTag.build("OrcEstimates"); } } \ No newline at end of file diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java new file mode 100644 index 0000000000000000000000000000000000000000..1d242e02707e0b1bc42b904730ad11abcbea53b2 --- /dev/null +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.cache; + +import java.util.Arrays; +import java.util.LinkedList; + +import org.apache.hadoop.hive.common.io.CacheTag; + +import org.junit.BeforeClass; +import org.junit.Test; + +import static java.util.stream.Collectors.toCollection; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +/** + * Unit tests for TestCacheContentsTracker functions. + */ +public class TestCacheContentsTracker { + + private static CacheContentsTracker tracker; + + @BeforeClass + public static void setup() { + LowLevelCachePolicy lowLevelCachePolicyMock = mock(LowLevelCachePolicy.class); + EvictionListener evictionListenerMock = mock(EvictionListener.class); + tracker = new CacheContentsTracker(lowLevelCachePolicyMock); + tracker.setEvictionListener(evictionListenerMock); + } + + /** + * Tests parent CacheTag generation by checking each step when traversing from 3rd level + * partition to DB level. + */ + @Test + public void testParentCacheTagGeneration() { + CacheTag db = cacheTagBuilder("dbname"); + CacheTag table = cacheTagBuilder("dbname.tablename"); + CacheTag p = cacheTagBuilder("dbname.tablename", "p=v1"); + CacheTag pp = cacheTagBuilder("dbname.tablename", "p=v1", "pp=vv1"); + CacheTag ppp = cacheTagBuilder("dbname.tablename", "p=v1", "pp=vv1", "ppp=vvv1"); + + assertTrue(pp.compareTo(CacheTag.createParentCacheTag(ppp)) == 0); + assertTrue(p.compareTo(CacheTag.createParentCacheTag(pp)) == 0); + assertTrue(table.compareTo(CacheTag.createParentCacheTag(p)) == 0); + assertTrue(db.compareTo(CacheTag.createParentCacheTag(table)) == 0); + assertNull(CacheTag.createParentCacheTag(db)); + } + + /** + * Caches some mock buffers and checks summary produced by CacheContentsTracker. Later this is + * done again after some mock buffers were evicted. + */ + @Test + public void testAggregatedStatsGeneration() { + cacheTestBuffers(); + StringBuilder sb = new StringBuilder(); + tracker.debugDumpShort(sb); + assertEquals(EXPECTED_CACHE_STATE_WHEN_FULL, sb.toString()); + + evictSomeTestBuffers(); + sb = new StringBuilder(); + tracker.debugDumpShort(sb); + assertEquals(EXPECTED_CACHE_STATE_AFTER_EVICTION, sb.toString()); + } + + private static LlapCacheableBuffer createMockBuffer(long size, CacheTag cacheTag) { + LlapCacheableBuffer llapCacheableBufferMock = mock(LlapCacheableBuffer.class); + + doAnswer(invocationOnMock -> { + return size; + }).when(llapCacheableBufferMock).getMemoryUsage(); + + doAnswer(invocationOnMock -> { + return cacheTag; + }).when(llapCacheableBufferMock).getTag(); + + return llapCacheableBufferMock; + } + + private static CacheTag cacheTagBuilder(String dbAndTable, String... partitions) { + if (partitions != null && partitions.length > 0) { + LinkedList parts = Arrays.stream(partitions).collect(toCollection(LinkedList::new)); + return CacheTag.build(dbAndTable, parts); + } else { + return CacheTag.build(dbAndTable); + } + } + + private static void cacheTestBuffers() { + tracker.cache(createMockBuffer(4 * 1024L, + cacheTagBuilder("default.testtable")), null); + tracker.cache(createMockBuffer(2 * 1024L, + cacheTagBuilder("otherdb.testtable", "p=v1", "pp=vv1")), null); + tracker.cache(createMockBuffer(32 * 1024L, + cacheTagBuilder("otherdb.testtable", "p=v1", "pp=vv1")), null); + tracker.cache(createMockBuffer(64 * 1024L, + cacheTagBuilder("otherdb.testtable", "p=v1", "pp=vv2")), null); + tracker.cache(createMockBuffer(128 * 1024L, + cacheTagBuilder("otherdb.testtable", "p=v2", "pp=vv1")), null); + tracker.cache(createMockBuffer(256 * 1024L, + cacheTagBuilder("otherdb.testtable2", "p=v3")), null); + tracker.cache(createMockBuffer(512 * 1024 * 1024L, + cacheTagBuilder("otherdb.testtable2", "p=v3")), null); + tracker.cache(createMockBuffer(1024 * 1024 * 1024L, + cacheTagBuilder("otherdb.testtable3")), null); + tracker.cache(createMockBuffer(2 * 1024 * 1024L, + cacheTagBuilder("default.testtable")), null); + } + + private static void evictSomeTestBuffers() { + tracker.notifyEvicted(createMockBuffer(32 * 1024L, + cacheTagBuilder("otherdb.testtable", "p=v1", "pp=vv1"))); + tracker.notifyEvicted(createMockBuffer(512 * 1024 * 1024L, + cacheTagBuilder("otherdb.testtable2", "p=v3"))); + tracker.notifyEvicted(createMockBuffer(2 * 1024 * 1024L, + cacheTagBuilder("default.testtable"))); + tracker.notifyEvicted(createMockBuffer(4 * 1024L, + cacheTagBuilder("default.testtable"))); + } + + private static final String EXPECTED_CACHE_STATE_WHEN_FULL = + "\n" + + "Cache state: \n" + + "default : 2/2, 2101248/2101248\n" + + "default.testtable : 2/2, 2101248/2101248\n" + + "otherdb : 7/7, 1611106304/1611106304\n" + + "otherdb.testtable : 4/4, 231424/231424\n" + + "otherdb.testtable/p=v1 : 3/3, 100352/100352\n" + + "otherdb.testtable/p=v1/pp=vv1 : 2/2, 34816/34816\n" + + "otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" + + "otherdb.testtable/p=v2 : 1/1, 131072/131072\n" + + "otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" + + "otherdb.testtable2 : 2/2, 537133056/537133056\n" + + "otherdb.testtable2/p=v3 : 2/2, 537133056/537133056\n" + + "otherdb.testtable3 : 1/1, 1073741824/1073741824"; + + private static final String EXPECTED_CACHE_STATE_AFTER_EVICTION = + "\n" + + "Cache state: \n" + + "default : 0/2, 0/2101248\n" + + "default.testtable : 0/2, 0/2101248\n" + + "otherdb : 5/7, 1074202624/1611106304\n" + + "otherdb.testtable : 3/4, 198656/231424\n" + + "otherdb.testtable/p=v1 : 2/3, 67584/100352\n" + + "otherdb.testtable/p=v1/pp=vv1 : 1/2, 2048/34816\n" + + "otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" + + "otherdb.testtable/p=v2 : 1/1, 131072/131072\n" + + "otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" + + "otherdb.testtable2 : 1/2, 262144/537133056\n" + + "otherdb.testtable2/p=v3 : 1/2, 262144/537133056\n" + + "otherdb.testtable3 : 1/1, 1073741824/1073741824"; + +} diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java index af04a51b5536550b2d2f7d3e008cf2b2dea607d4..c887394b9751510b7251b51af4d5927935410827 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.io.Allocator; +import org.apache.hadoop.hive.common.io.CacheTag; import org.apache.hadoop.hive.common.io.DataCache; import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.hadoop.hive.common.io.DiskRangeList; @@ -61,7 +62,7 @@ new ConcurrentHashMap<>(); public static Path registerFile(DataCache cache, Path path, Object fileKey, - TreeMap index, Configuration conf, String tag) throws IOException { + TreeMap index, Configuration conf, CacheTag tag) throws IOException { long splitId = currentSplitId.incrementAndGet(); CacheAwareInputStream stream = new CacheAwareInputStream( cache, conf, index, path, fileKey, -1, tag); @@ -170,14 +171,14 @@ public boolean rename(Path arg0, Path arg1) throws IOException { private final TreeMap chunkIndex; private final Path path; private final Object fileKey; - private final String tag; + private final CacheTag tag; private final Configuration conf; private final DataCache cache; private final int bufferSize; private long position = 0; public CacheAwareInputStream(DataCache cache, Configuration conf, - TreeMap chunkIndex, Path path, Object fileKey, int bufferSize, String tag) { + TreeMap chunkIndex, Path path, Object fileKey, int bufferSize, CacheTag tag) { this.cache = cache; this.fileKey = fileKey; this.chunkIndex = chunkIndex; diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..a041426bba40adffe267f8ecf7896a9548eb2bd1 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap; + +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.io.CacheTag; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.DagUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.mapred.JobConf; + +import com.google.common.collect.Lists; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Covers utility functions that are used by LLAP code and depend on Hive constructs e.g. ql code. + */ +public final class LlapHiveUtils { + + public static final Logger LOG = LoggerFactory.getLogger(LlapHiveUtils.class); + + private LlapHiveUtils() { + // Not to be used; + } + + public static CacheTag getDbAndTableNameForMetrics(Path path, boolean includeParts, + Map parts) { + + assert(parts != null); + + // Look for PartitionDesc instance matching our Path + Path parentPath = path; + PartitionDesc part = parts.get(parentPath); + while (!parentPath.isRoot() && part == null) { + parentPath = parentPath.getParent(); + part = parts.get(parentPath); + } + + // Fallback to legacy cache tag creation logic. + if (part == null) { + return CacheTag.build(LlapUtil.getDbAndTableNameForMetrics(path, includeParts)); + } + + if (!includeParts || !part.isPartitioned()) { + return CacheTag.build(part.getTableName()); + } else { + return CacheTag.build(part.getTableName(), part.getPartSpec()); + } + } + + /** + * Returns MapWork based what is serialized in the JobConf instance provided. + * @param job + * @return the MapWork instance. Might be null if missing. + * @throws HiveException + */ + public static MapWork findMapWork(JobConf job) throws HiveException { + String inputName = job.get(Utilities.INPUT_NAME, null); + if (LOG.isDebugEnabled()) { + LOG.debug("Initializing for input " + inputName); + } + String prefixes = job.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); + if (prefixes != null && !StringUtils.isBlank(prefixes)) { + // Currently SMB is broken, so we cannot check if it's compatible with IO elevator. + // So, we don't use the below code that would get the correct MapWork. See HIVE-16985. + return null; + } + + BaseWork work = null; + // HIVE-16985: try to find the fake merge work for SMB join, that is really another MapWork. + if (inputName != null) { + if (prefixes == null || + !Lists.newArrayList(prefixes.split(",")).contains(inputName)) { + inputName = null; + } + } + if (inputName != null) { + work = Utilities.getMergeWork(job, inputName); + } + + if (!(work instanceof MapWork)) { + work = Utilities.getMapWork(job); + } + return (MapWork) work; + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 241a3001e6e0002377736d6d0e820fde004b0bac..117e4e6eac70393cb41bd1fb58547cb8e3cc9045 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.common.Pool; import org.apache.hadoop.hive.common.Pool.PoolObjectHelper; import org.apache.hadoop.hive.common.io.Allocator; +import org.apache.hadoop.hive.common.io.CacheTag; import org.apache.hadoop.hive.common.io.DataCache; import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.hadoop.hive.common.io.DiskRangeList; @@ -148,14 +149,14 @@ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end private final IoTrace trace; private final TypeDescription fileSchema; private final WriterVersion version; - private final String tag; + private final CacheTag tag; private AtomicBoolean isStopped; private StoppableAllocator allocator; public EncodedReaderImpl(Object fileKey, List types, TypeDescription fileSchema, org.apache.orc.CompressionKind kind, WriterVersion version, int bufferSize, long strideRate, DataCache cacheWrapper, DataReader dataReader, - PoolFactory pf, IoTrace trace, boolean useCodecPool, String tag) throws IOException { + PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag) throws IOException { this.fileKey = fileKey; this.compressionKind = kind; this.isCompressed = kind != org.apache.orc.CompressionKind.NONE; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java index 210c987b7f580dacda5bdb487af9cf234a738b79..8d3336f68c8dfcbc2c5cd7087f36cfb663da646f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.common.Pool; import org.apache.hadoop.hive.common.Pool.PoolObjectHelper; +import org.apache.hadoop.hive.common.io.CacheTag; import org.apache.hadoop.hive.common.io.DataCache; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData; @@ -46,7 +47,7 @@ * @return The reader. */ EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader, - PoolFactory pf, IoTrace trace, boolean useCodecPool, String tag) throws IOException; + PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag) throws IOException; /** The factory that can create (or return) the pools used by encoded reader. */ public interface PoolFactory { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java index a9a9f101948a970e0dbf2f77eeb6f688a88d1cbd..e137c24479e533607ce21a6bb66bc361ee482ab4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.io.CacheTag; import org.apache.hadoop.hive.common.io.DataCache; import org.apache.orc.CompressionCodec; import org.apache.orc.DataReader; @@ -35,7 +36,7 @@ public ReaderImpl(Path path, ReaderOptions options) throws IOException { @Override public EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader, - PoolFactory pf, IoTrace trace, boolean useCodecPool, String tag) throws IOException { + PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag) throws IOException { return new EncodedReaderImpl(fileKey, types, getSchema(), compressionKind, getWriterVersion(), bufferSize, rowIndexStride, dataCache, dataReader, pf, trace, useCodecPool, tag); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index 61e2556b08fe4247f35673f24378505ada20a605..ea6dfb8a88107c51124c519ef75f0e4e29d352ae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -22,13 +22,14 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.io.CacheTag; import org.apache.hadoop.hive.common.io.DataCache; import org.apache.hadoop.hive.common.io.FileMetadataCache; import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.LlapCacheAwareFs; -import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.llap.LlapHiveUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; @@ -37,6 +38,9 @@ import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase; import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher; import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; @@ -78,6 +82,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -104,6 +109,7 @@ private Object[] partitionValues; private Path cacheFsPath; private static final int MAP_DEFINITION_LEVEL_MAX = 3; + private Map parts; /** * For each request column, the reader to read this column. This is NULL if this column @@ -170,13 +176,19 @@ private void initPartitionValues(FileSplit fileSplit, JobConf conf) throws IOExc @SuppressWarnings("deprecation") public void initialize( InputSplit oldSplit, - JobConf configuration) throws IOException, InterruptedException { + JobConf configuration) throws IOException, InterruptedException, HiveException { // the oldSplit may be null during the split phase if (oldSplit == null) { return; } ParquetMetadata footer; List blocks; + + MapWork mapWork = LlapHiveUtils.findMapWork(jobConf); + if (mapWork != null) { + parts = mapWork.getPathToPartitionInfo(); + } + ParquetInputSplit split = (ParquetInputSplit) oldSplit; boolean indexAccess = configuration.getBoolean(DataWritableReadSupport.PARQUET_COLUMN_INDEX_ACCESS, false); @@ -190,7 +202,7 @@ public void initialize( // if task.side.metadata is set, rowGroupOffsets is null Object cacheKey = null; - String cacheTag = null; + CacheTag cacheTag = null; // TODO: also support fileKey in splits, like OrcSplit does if (metadataCache != null) { cacheKey = HdfsUtils.getFileId(file.getFileSystem(configuration), file, @@ -200,7 +212,7 @@ public void initialize( } if (cacheKey != null) { if (HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_TRACK_CACHE_USAGE)) { - cacheTag = LlapUtil.getDbAndTableNameForMetrics(file, true); + cacheTag = LlapHiveUtils.getDbAndTableNameForMetrics(file, true, parts); } // If we are going to use cache, change the path to depend on file ID for extra consistency. FileSystem fs = file.getFileSystem(configuration); @@ -265,7 +277,7 @@ public void initialize( } private Path wrapPathForCache(Path path, Object fileKey, JobConf configuration, - List blocks, String tag) throws IOException { + List blocks, CacheTag tag) throws IOException { if (fileKey == null || cache == null) { return path; } @@ -292,7 +304,7 @@ private Path wrapPathForCache(Path path, Object fileKey, JobConf configuration, } private ParquetMetadata readSplitFooter(JobConf configuration, final Path file, - Object cacheKey, MetadataFilter filter, String tag) throws IOException { + Object cacheKey, MetadataFilter filter, CacheTag tag) throws IOException { MemoryBufferOrBuffers footerData = (cacheKey == null || metadataCache == null) ? null : metadataCache.getFileMetadata(cacheKey); if (footerData != null) { diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java new file mode 100644 index 0000000000000000000000000000000000000000..623c1814b4f4524399a61d9d4ac01abcb9feb6af --- /dev/null +++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.io; + +import java.util.LinkedList; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; + +/** + * Used for identifying the related object of the buffer stored in cache. + * Comes in 3 flavours to optimize for minimal memory overhead: + * - TableCacheTag for tables without partitions: DB/table level + * - SinglePartitionCacheTag for tables with 1 partition level: DB/table/1st_partition + * - MultiPartitionCacheTag for tables with >1 partition levels: + * DB/table/1st_partition/.../nth_partition . + */ +public abstract class CacheTag implements Comparable { + /** + * Prepended by DB name and '.' . + */ + protected final String tableName; + + private CacheTag(String tableName) { + this.tableName = tableName.intern(); + } + + public String getTableName() { + return tableName; + } + + @Override + public int compareTo(CacheTag o) { + return tableName.compareTo(o.tableName); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || !(obj instanceof CacheTag)) { + return false; + } else { + return this.compareTo((CacheTag) obj) == 0; + } + } + + @Override + public int hashCode() { + int res = tableName.hashCode(); + return res; + } + + public static final CacheTag build(String tableName) { + if (StringUtils.isEmpty(tableName)) { + throw new IllegalArgumentException(); + } + return new TableCacheTag(tableName); + } + + public static final CacheTag build(String tableName, Map partDescMap) { + if (StringUtils.isEmpty(tableName) || partDescMap == null || partDescMap.isEmpty()) { + throw new IllegalArgumentException(); + } + + LinkedList partDescList = new LinkedList<>(); + + for (Map.Entry entry : partDescMap.entrySet()) { + StringBuilder sb = new StringBuilder(); + sb.append(entry.getKey()).append("=").append(entry.getValue()); + partDescList.add(sb.toString()); + } + + if (partDescList.size() == 1) { + return new SinglePartitionCacheTag(tableName, partDescList.get(0)); + } else { + // In this case it must be >1 + return new MultiPartitionCacheTag(tableName, partDescList); + } + } + + // Assumes elements of partDescList are already in p1=v1 format + public static final CacheTag build(String tableName, LinkedList partDescList) { + if (StringUtils.isEmpty(tableName) || partDescList == null || partDescList.isEmpty()) { + throw new IllegalArgumentException(); + } + + if (partDescList.size() == 1) { + return new SinglePartitionCacheTag(tableName, partDescList.get(0)); + } else { + // In this case it must be >1 + return new MultiPartitionCacheTag(tableName, partDescList); + } + } + + /** + * Constructs a (fake) parent CacheTag instance by walking back in the hierarchy i.e. stepping + * from inner to outer partition levels, then producing a CacheTag for the table and finally + * the DB. + */ + public static final CacheTag createParentCacheTag(CacheTag tag) { + if (tag == null) { + throw new IllegalArgumentException(); + } + + if (tag instanceof MultiPartitionCacheTag) { + MultiPartitionCacheTag multiPartitionCacheTag = (MultiPartitionCacheTag) tag; + if (multiPartitionCacheTag.partitionDesc.size() > 2) { + LinkedList subList = new LinkedList<>(multiPartitionCacheTag.partitionDesc); + subList.removeLast(); + return new MultiPartitionCacheTag(multiPartitionCacheTag.tableName, subList); + } else { + return new SinglePartitionCacheTag(multiPartitionCacheTag.tableName, + multiPartitionCacheTag.partitionDesc.get(0)); + } + } + + if (tag instanceof SinglePartitionCacheTag) { + return new TableCacheTag(tag.tableName); + } else { + // DB level + int ix = tag.tableName.indexOf("."); + if (ix <= 0) { + return null; + } + return new TableCacheTag(tag.tableName.substring(0, ix)); + } + + } + + /** + * CacheTag for tables without partitions. + */ + public static final class TableCacheTag extends CacheTag { + + private TableCacheTag(String tableName) { + super(tableName); + } + + @Override + public int compareTo(CacheTag o) { + if (o instanceof SinglePartitionCacheTag || o instanceof MultiPartitionCacheTag) { + return -1; + } else { + return super.compareTo(o); + } + } + + } + + /** + * CacheTag for tables with partitions. + */ + public abstract static class PartitionCacheTag extends CacheTag { + + private PartitionCacheTag(String tableName) { + super(tableName); + } + + /** + * Returns a pretty printed String version of the partitionDesc in the format of p1=v1/p2=v2... + * @return the pretty printed String + */ + public abstract String partitionDescToString(); + + } + + /** + * CacheTag for tables with exactly one partition level. + */ + public static final class SinglePartitionCacheTag extends PartitionCacheTag { + + private final String partitionDesc; + + private SinglePartitionCacheTag(String tableName, String partitionDesc) { + super(tableName); + if (StringUtils.isEmpty(partitionDesc)) { + throw new IllegalArgumentException(); + } + this.partitionDesc = partitionDesc.intern(); + } + + @Override + public String partitionDescToString() { + return this.partitionDesc; + } + + @Override + public int compareTo(CacheTag o) { + if (o instanceof TableCacheTag) { + return 1; + } else if (o instanceof MultiPartitionCacheTag) { + return -1; + } + SinglePartitionCacheTag other = (SinglePartitionCacheTag) o; + return super.compareTo(o) + + partitionDesc.toString().compareTo(other.partitionDesc.toString()); + } + + @Override + public int hashCode() { + return super.hashCode() + partitionDesc.hashCode(); + } + } + + /** + * CacheTag for tables with more than one partition level. + */ + public static final class MultiPartitionCacheTag extends PartitionCacheTag { + + private final LinkedList partitionDesc; + + private MultiPartitionCacheTag(String tableName, LinkedList partitionDesc) { + super(tableName); + this.partitionDesc = partitionDesc; + if (this.partitionDesc != null && this.partitionDesc.size() > 1) { + this.partitionDesc.stream().forEach(p -> p.intern()); + } else { + throw new IllegalArgumentException(); + } + } + + @Override + public int compareTo(CacheTag o) { + if (o instanceof TableCacheTag || o instanceof SinglePartitionCacheTag) { + return 1; + } + MultiPartitionCacheTag other = (MultiPartitionCacheTag) o; + return super.compareTo(o) + + partitionDesc.toString().compareTo(other.partitionDesc.toString()); + } + + @Override + public int hashCode() { + int res = super.hashCode(); + for (String p : partitionDesc) { + res += p.hashCode(); + } + return res; + } + + @Override + public String partitionDescToString() { + return String.join("/", this.partitionDesc); + } + + } + +} + diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java index 2ac0a18a5026e76e65c3c3a8b81d5a844c472ed2..9b23a710c46f761ed04993c40af21eb9cfe5d593 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java +++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java @@ -110,5 +110,5 @@ DiskRangeList getFileData(Object fileKey, DiskRangeList range, long baseOffset, * the replacement chunks from cache are updated directly in the array. */ long[] putFileData(Object fileKey, DiskRange[] ranges, - MemoryBuffer[] data, long baseOffset, String tag); + MemoryBuffer[] data, long baseOffset, CacheTag tag); } diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java index d7de3619380d24a1aeea2bac9a66485d7d468517..469172237e6c002790aa311834c6672baab4544d 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java +++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java @@ -42,10 +42,10 @@ MemoryBufferOrBuffers putFileMetadata( @Deprecated MemoryBufferOrBuffers putFileMetadata( - Object fileKey, int length, InputStream is, String tag) throws IOException; + Object fileKey, int length, InputStream is, CacheTag tag) throws IOException; @Deprecated - MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, String tag); + MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, CacheTag tag); /** * Releases the buffer returned from getFileMetadata or putFileMetadata method. @@ -61,8 +61,8 @@ MemoryBufferOrBuffers putFileMetadata( * The caller must decref this buffer when done. */ MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, - String tag, AtomicBoolean isStopped); + CacheTag tag, AtomicBoolean isStopped); MemoryBufferOrBuffers putFileMetadata(Object fileKey, int length, - InputStream is, String tag, AtomicBoolean isStopped) throws IOException; + InputStream is, CacheTag tag, AtomicBoolean isStopped) throws IOException; }