diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java index fcfc22a712..a803cbf4cb 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.llap.cache; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; @@ -45,9 +46,10 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; +import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator; public final class BuddyAllocator - implements EvictionAwareAllocator, BuddyAllocatorMXBean, LlapIoDebugDump { + implements EvictionAwareAllocator, StoppableAllocator, BuddyAllocatorMXBean, LlapIoDebugDump { private final Arena[] arenas; private final AtomicInteger allocatedArenas = new AtomicInteger(0); @@ -224,16 +226,24 @@ public int validateAndDetermineArenaSize(int arenaCount, long maxSizeVal) { return (int)arenaSizeVal; } + + @VisibleForTesting @Override public void allocateMultiple(MemoryBuffer[] dest, int size) throws AllocatorOutOfMemoryException { - allocateMultiple(dest, size, null); + allocateMultiple(dest, size, null, null); } - // TODO: would it make sense to return buffers asynchronously? +// TODO# verify not called @Override public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory) throws AllocatorOutOfMemoryException { + allocateMultiple(dest, size, factory, null); + } + + @Override + public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory, AtomicBoolean isStopped) + throws AllocatorOutOfMemoryException { assert size > 0 : "size is " + size; if (size > maxAllocation) { throw new RuntimeException("Trying to allocate " + size + "; max is " + maxAllocation); @@ -243,7 +253,7 @@ public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory int allocationSize = 1 << allocLog2; // If using async, we could also reserve one by one. - memoryManager.reserveMemory(dest.length << allocLog2); + memoryManager.reserveMemory(dest.length << allocLog2, isStopped); for (int i = 0; i < dest.length; ++i) { if (dest[i] != null) continue; // Note: this is backward compat only. Should be removed with createUnallocated. diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java index 4297cfc61d..c1c24b7d52 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.llap.cache; import com.google.common.annotations.VisibleForTesting; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -49,18 +50,26 @@ public LowLevelCacheMemoryManager( } } + public static class ReserveFailedException extends RuntimeException { + private static final long serialVersionUID = 1L; + public ReserveFailedException(AtomicBoolean isStopped) { + super("Cannot reserve memory" + + (Thread.currentThread().isInterrupted() ? "; thread interrupted" : "") + + ((isStopped != null && isStopped.get()) ? "; thread stopped" : "")); + } + } @Override - public void reserveMemory(final long memoryToReserve) { - boolean result = reserveMemory(memoryToReserve, true); + public void reserveMemory(final long memoryToReserve, AtomicBoolean isStopped) { + boolean result = reserveMemory(memoryToReserve, true, isStopped); if (result) return; // Can only happen if there's no evictor, or if thread is interrupted. - throw new RuntimeException("Cannot reserve memory" - + (Thread.currentThread().isInterrupted() ? "; thread interrupted" : "")); + throw new ReserveFailedException(isStopped); } @VisibleForTesting - public boolean reserveMemory(final long memoryToReserve, boolean waitForEviction) { + public boolean reserveMemory(final long memoryToReserve, + boolean waitForEviction, AtomicBoolean isStopped) { // TODO: if this cannot evict enough, it will spin infinitely. Terminate at some point? int badCallCount = 0; int nextLog = 4; @@ -83,17 +92,10 @@ public boolean reserveMemory(final long memoryToReserve, boolean waitForEviction result = false; break; } - ++badCallCount; - if (badCallCount == nextLog) { - LlapIoImpl.LOG.warn("Cannot evict blocks for " + badCallCount + " calls; cache full?"); - nextLog <<= 1; - try { - Thread.sleep(Math.min(1000, nextLog)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - result = false; - break; - } + + if (isStopped != null && isStopped.get()) { + result = false; + break; } continue; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java index 542041d133..fedade5c9c 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hive.llap.cache; +import java.util.concurrent.atomic.AtomicBoolean; + public interface MemoryManager { void releaseMemory(long memUsage); void updateMaxSize(long maxSize); - void reserveMemory(long memoryToReserve); + void reserveMemory(long memoryToReserve, AtomicBoolean isStopped); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index b76b0ded98..e8a3b40d8d 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -173,7 +174,7 @@ public void resetBeforeOffer(OrcEncodedColumnBatch t) { * Contains only stripes that are read, and only columns included. null => read all RGs. */ private boolean[][] stripeRgs; - private volatile boolean isStopped = false; + private AtomicBoolean isStopped = new AtomicBoolean(false); @SuppressWarnings("unused") private volatile boolean isPaused = false; @@ -240,7 +241,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff @Override public void stop() { LOG.debug("Encoded reader is being stopped"); - isStopped = true; + isStopped.set(true); } @Override @@ -436,6 +437,7 @@ private void ensureDataReader() throws IOException { stripeReader = orcReader.encodedReader( fileKey, dw, dw, useObjectPools ? POOL_FACTORY : null, trace, useCodecPool, cacheTag); stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled()); + stripeReader.setStopped(isStopped); } private void recordReaderTime(long startTime) { @@ -454,7 +456,7 @@ private void validateFileMetadata() throws IOException { } private boolean processStop() { - if (!isStopped) return false; + if (!isStopped.get()) return false; LOG.info("Encoded data reader is stopping"); tracePool.offer(trace); cleanupReaders(); @@ -584,7 +586,7 @@ private OrcFileMetadata getFileFooterFromCacheOrDisk() throws IOException { ensureOrcReader(); ByteBuffer tailBufferBb = orcReader.getSerializedFileFooter(); if (hasCache) { - tailBuffers = metadataCache.putFileMetadata(fileKey, tailBufferBb, cacheTag); + tailBuffers = metadataCache.putFileMetadata(fileKey, tailBufferBb, cacheTag, isStopped); metadataCache.decRefBuffer(tailBuffers); // We don't use the cache's copy of the buffer. } FileTail ft = orcReader.getFileTail(); @@ -677,7 +679,7 @@ private OrcStripeMetadata createOrcStripeMetadataObject(int stripeIx, StripeInfo assert footerRange.next == null; // Can only happens w/zcr for a single input buffer. if (hasCache) { LlapBufferOrBuffers cacheBuf = metadataCache.putStripeTail( - stripeKey, footerRange.getData().duplicate(), cacheTag); + stripeKey, footerRange.getData().duplicate(), cacheTag, isStopped); metadataCache.decRefBuffer(cacheBuf); // We don't use this one. } ByteBuffer bb = footerRange.getData().duplicate(); @@ -918,7 +920,7 @@ public DiskRangeList getFileData(Object fileKey, DiskRangeList range, return lowLevelCache.putFileData( fileKey, ranges, data, baseOffset, Priority.NORMAL, counters, tag); } else if (metadataCache != null) { - metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset); + metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset, isStopped); } return null; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java index 5b54af5d3b..257617554d 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -68,6 +69,7 @@ import org.apache.hadoop.hive.ql.io.orc.Writer; import org.apache.hadoop.hive.ql.io.orc.encoded.CacheChunk; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; +import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; @@ -150,7 +152,7 @@ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end private final String cacheTag; private final FileSystem fs; - private volatile boolean isStopped = false; + private AtomicBoolean isStopped = new AtomicBoolean(false); private final Deserializer sourceSerDe; private final InputFormat sourceInputFormat; private final Reporter reporter; @@ -245,7 +247,7 @@ private static int determineAllocSize(BufferUsageManager bufferManager, Configur @Override public void stop() { LlapIoImpl.LOG.debug("Encoded reader is being stopped"); - isStopped = true; + isStopped.set(true); } @Override @@ -344,16 +346,18 @@ public String toCoordinateString() { private final Map streams = new HashMap<>(); private final Map> colStreams = new HashMap<>(); private final boolean doesSourceHaveIncludes; + private final AtomicBoolean isStopped; public CacheWriter(BufferUsageManager bufferManager, List columnIds, boolean[] writerIncludes, boolean doesSourceHaveIncludes, - Allocator.BufferObjectFactory bufferFactory) { + Allocator.BufferObjectFactory bufferFactory, AtomicBoolean isStopped) { this.bufferManager = bufferManager; assert writerIncludes != null; // Taken care of on higher level. this.writerIncludes = writerIncludes; this.doesSourceHaveIncludes = doesSourceHaveIncludes; this.columnIds = columnIds; this.bufferFactory = bufferFactory; + this.isStopped = isStopped; startStripe(); } @@ -440,7 +444,7 @@ public OutputReceiver createDataStream(StreamName name) throws IOException { if (LlapIoImpl.LOG.isTraceEnabled()) { LlapIoImpl.LOG.trace("Creating cache receiver for " + name); } - CacheOutputReceiver cor = new CacheOutputReceiver(bufferManager, bufferFactory, name); + CacheOutputReceiver cor = new CacheOutputReceiver(bufferManager, name, bufferFactory, isStopped); or = cor; List list = colStreams.get(name.getColumn()); if (list == null) { @@ -597,12 +601,17 @@ public long getFileBytes(int column) { private List buffers = null; private int lastBufferPos = -1; private boolean suppressed = false; + private final AtomicBoolean isStopped; + private final StoppableAllocator allocator; public CacheOutputReceiver(BufferUsageManager bufferManager, - BufferObjectFactory bufferFactory, StreamName name) { + StreamName name, BufferObjectFactory bufferFactory, AtomicBoolean isStopped) { this.bufferManager = bufferManager; this.bufferFactory = bufferFactory; + Allocator alloc = bufferManager.getAllocator(); + this.allocator = alloc instanceof StoppableAllocator ? (StoppableAllocator) alloc : null; this.name = name; + this.isStopped = isStopped; } public void clear() { @@ -617,6 +626,15 @@ public void suppress() { lastBufferPos = -1; } + private void allocateMultiple(MemoryBuffer[] dest, int size) { + if (allocator != null) { + allocator.allocateMultiple(dest, size, bufferFactory, isStopped); + } else { + bufferManager.getAllocator().allocateMultiple(dest, size, bufferFactory); + } + } + + @Override public void output(ByteBuffer buffer) throws IOException { // TODO: avoid put() by working directly in OutStream? @@ -640,7 +658,7 @@ public void output(ByteBuffer buffer) throws IOException { boolean isNewBuffer = (lastBufferPos == -1); if (isNewBuffer) { MemoryBuffer[] dest = new MemoryBuffer[1]; - bufferManager.getAllocator().allocateMultiple(dest, size, bufferFactory); + allocateMultiple(dest, size); LlapSerDeDataBuffer newBuffer = (LlapSerDeDataBuffer)dest[0]; bb = newBuffer.getByteBufferRaw(); lastBufferPos = bb.position(); @@ -1417,7 +1435,7 @@ public void startReadSplitFromFile( // TODO: move this into ctor? EW would need to create CacheWriter then List cwColIds = writer.isOnlyWritingIncludedColumns() ? splitColumnIds : columnIds; writer.init(new CacheWriter(bufferManager, cwColIds, splitIncludes, - writer.isOnlyWritingIncludedColumns(), bufferFactory), daemonConf, split.getPath()); + writer.isOnlyWritingIncludedColumns(), bufferFactory, isStopped), daemonConf, split.getPath()); if (writer instanceof VectorDeserializeOrcWriter) { VectorDeserializeOrcWriter asyncWriter = (VectorDeserializeOrcWriter)writer; asyncWriter.startAsync(new AsyncCacheDataCallback()); @@ -1673,7 +1691,7 @@ private void recordReaderTime(long startTime) { } private boolean processStop() { - if (!isStopped) return false; + if (!isStopped.get()) return false; LlapIoImpl.LOG.info("SerDe-based data reader is stopping"); cleanup(true); return true; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java index 426d599b29..dd4498f341 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java @@ -28,11 +28,13 @@ import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.hadoop.hive.common.io.DiskRangeList; import org.apache.hadoop.hive.common.io.DataCache.BooleanRef; import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; +import org.apache.hadoop.hive.llap.cache.BuddyAllocator; import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator; import org.apache.hadoop.hive.llap.cache.EvictionDispatcher; import org.apache.hadoop.hive.llap.cache.LlapAllocatorBuffer; @@ -43,6 +45,7 @@ import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey; +import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator; public class MetadataCache implements LlapIoDebugDump, FileMetadataCache { private final ConcurrentHashMap metadata = @@ -51,10 +54,10 @@ private final ConcurrentHashMap estimateErrors; private final MemoryManager memoryManager; private final LowLevelCachePolicy policy; - private final EvictionAwareAllocator allocator; + private final BuddyAllocator allocator; private final LlapDaemonCacheMetrics metrics; - public MetadataCache(EvictionAwareAllocator allocator, MemoryManager memoryManager, + public MetadataCache(BuddyAllocator allocator, MemoryManager memoryManager, LowLevelCachePolicy policy, boolean useEstimateCache, LlapDaemonCacheMetrics metrics) { this.memoryManager = memoryManager; this.allocator = allocator; @@ -64,7 +67,7 @@ public MetadataCache(EvictionAwareAllocator allocator, MemoryManager memoryManag ? new ConcurrentHashMap() : null; } - public void putIncompleteCbs(Object fileKey, DiskRange[] ranges, long baseOffset) { + public void putIncompleteCbs(Object fileKey, DiskRange[] ranges, long baseOffset, AtomicBoolean isStopped) { if (estimateErrors == null) return; OrcFileEstimateErrors errorData = estimateErrors.get(fileKey); boolean isNew = false; @@ -76,7 +79,7 @@ public void putIncompleteCbs(Object fileKey, DiskRange[] ranges, long baseOffset errorData.addError(range.getOffset(), range.getLength(), baseOffset); } long memUsage = errorData.estimateMemoryUsage(); - memoryManager.reserveMemory(memUsage); + memoryManager.reserveMemory(memUsage, isStopped); OrcFileEstimateErrors old = estimateErrors.putIfAbsent(fileKey, errorData); if (old != null) { errorData = old; @@ -150,34 +153,50 @@ private LlapBufferOrBuffers getInternal(Object key) { } @Override - public LlapBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer) { - return putInternal(fileKey, tailBuffer, null); + public MemoryBufferOrBuffers putFileMetadata(Object fileKey, + ByteBuffer tailBuffer) { + return putInternal(fileKey, tailBuffer, null, null); } @Override - public LlapBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, String tag) { - return putInternal(fileKey, tailBuffer, tag); + public MemoryBufferOrBuffers putFileMetadata(Object fileKey, + ByteBuffer tailBuffer, String tag) { + return putInternal(fileKey, tailBuffer, tag, null); + } + + @Override + public MemoryBufferOrBuffers putFileMetadata(Object fileKey, int length, + InputStream is) throws IOException { + return putFileMetadata(fileKey, length, is, null, null); } public LlapBufferOrBuffers putStripeTail( - OrcBatchKey stripeKey, ByteBuffer tailBuffer, String tag) { - return putInternal(new StripeKey(stripeKey.fileKey, stripeKey.stripeIx), tailBuffer, tag); + OrcBatchKey stripeKey, ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) { + return putInternal(new StripeKey(stripeKey.fileKey, stripeKey.stripeIx), tailBuffer, tag, isStopped); + } + + @Override + public MemoryBufferOrBuffers putFileMetadata(Object fileKey, int length, + InputStream is, String tag) throws IOException { + return putFileMetadata(fileKey, length, is, tag, null); } + @Override - public LlapBufferOrBuffers putFileMetadata( - Object fileKey, int length, InputStream is) throws IOException { - return putFileMetadata(fileKey, length, is, null); + public LlapBufferOrBuffers putFileMetadata(Object fileKey, + ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) { + return putFileMetadata(fileKey, tailBuffer, tag, isStopped); + } @Override - public LlapBufferOrBuffers putFileMetadata( - Object fileKey, int length, InputStream is, String tag) throws IOException { + public LlapBufferOrBuffers putFileMetadata(Object fileKey, int length, InputStream is, + String tag, AtomicBoolean isStopped) throws IOException { LlapBufferOrBuffers result = null; while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value). LlapBufferOrBuffers oldVal = metadata.get(fileKey); if (oldVal == null) { - result = wrapBbForFile(result, fileKey, length, is, tag); + result = wrapBbForFile(result, fileKey, length, is, tag, isStopped); if (!lockBuffer(result, false)) { throw new AssertionError("Cannot lock a newly created value " + result); } @@ -198,7 +217,7 @@ public LlapBufferOrBuffers putFileMetadata( @SuppressWarnings({ "rawtypes", "unchecked" }) private LlapBufferOrBuffers wrapBbForFile(LlapBufferOrBuffers result, - Object fileKey, int length, InputStream stream, String tag) throws IOException { + Object fileKey, int length, InputStream stream, String tag, AtomicBoolean isStopped) throws IOException { if (result != null) return result; int maxAlloc = allocator.getMaxAllocation(); LlapMetadataBuffer[] largeBuffers = null; @@ -207,7 +226,7 @@ private LlapBufferOrBuffers wrapBbForFile(LlapBufferOrBuffers result, for (int i = 0; i < largeBuffers.length; ++i) { largeBuffers[i] = new LlapMetadataBuffer(fileKey, tag); } - allocator.allocateMultiple(largeBuffers, maxAlloc, null); + allocator.allocateMultiple(largeBuffers, maxAlloc, null, isStopped); for (int i = 0; i < largeBuffers.length; ++i) { readIntoCacheBuffer(stream, maxAlloc, largeBuffers[i]); } @@ -218,7 +237,7 @@ private LlapBufferOrBuffers wrapBbForFile(LlapBufferOrBuffers result, } else { LlapMetadataBuffer[] smallBuffer = new LlapMetadataBuffer[1]; smallBuffer[0] = new LlapMetadataBuffer(fileKey, tag); - allocator.allocateMultiple(smallBuffer, length, null); + allocator.allocateMultiple(smallBuffer, length, null, isStopped); readIntoCacheBuffer(stream, smallSize, smallBuffer[0]); if (largeBuffers == null) { return smallBuffer[0]; // This is the overwhelmingly common case. @@ -243,12 +262,12 @@ private static void readIntoCacheBuffer( bb.position(pos); } - private LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer, String tag) { + private LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) { LlapBufferOrBuffers result = null; while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value). LlapBufferOrBuffers oldVal = metadata.get(key); if (oldVal == null) { - result = wrapBb(result, key, tailBuffer, tag); + result = wrapBb(result, key, tailBuffer, tag, isStopped); oldVal = metadata.putIfAbsent(key, result); if (oldVal == null) { cacheInPolicy(result); // Cached successfully, add to policy. @@ -306,11 +325,11 @@ public void decRefBuffer(MemoryBufferOrBuffers buffer) { } private LlapBufferOrBuffers wrapBb( - LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer, String tag) { + LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) { if (result != null) return result; if (tailBuffer.remaining() <= allocator.getMaxAllocation()) { // The common case by far. - return wrapSmallBb(new LlapMetadataBuffer(key, tag), tailBuffer); + return wrapSmallBb(new LlapMetadataBuffer(key, tag), tailBuffer, isStopped); } else { int allocCount = determineAllocCount(tailBuffer); @SuppressWarnings("unchecked") @@ -318,22 +337,24 @@ public void decRefBuffer(MemoryBufferOrBuffers buffer) { for (int i = 0; i < allocCount; ++i) { results[i] = new LlapMetadataBuffer(key, tag); } - wrapLargeBb(results, tailBuffer); + wrapLargeBb(results, tailBuffer, isStopped); return new LlapMetadataBuffers(results); } } - private T wrapSmallBb(T result, ByteBuffer tailBuffer) { + private T wrapSmallBb(T result, ByteBuffer tailBuffer, + AtomicBoolean isStopped) { // Note: we pass in null factory because we allocate objects here. We could also pass a // per-call factory that would set fileKey; or set it after put. - allocator.allocateMultiple(new MemoryBuffer[] { result }, tailBuffer.remaining(), null); + allocator.allocateMultiple(new MemoryBuffer[] { result }, tailBuffer.remaining(), null, isStopped); return putBufferToDest(tailBuffer.duplicate(), result); } - private void wrapLargeBb(T[] results, ByteBuffer tailBuffer) { + private void wrapLargeBb(T[] results, ByteBuffer tailBuffer, + AtomicBoolean isStopped) { // Note: we pass in null factory because we allocate objects here. We could also pass a // per-call factory that would set fileKey; or set it after put. - allocator.allocateMultiple(results, allocator.getMaxAllocation(), null); + allocator.allocateMultiple(results, allocator.getMaxAllocation(), null, isStopped); ByteBuffer src = tailBuffer.duplicate(); int pos = src.position(), remaining = src.remaining(); for (int i = 0; i < results.length; ++i) { diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java index 1e6f3ac96d..b3179c0a74 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hive.common.io.Allocator.AllocatorOutOfMemoryException; import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; @@ -58,7 +59,7 @@ public TestBuddyAllocator(boolean direct, boolean mmap) { static class DummyMemoryManager implements MemoryManager { @Override - public void reserveMemory(long memoryToReserve) { + public void reserveMemory(long memoryToReserve, AtomicBoolean isStopped) { } @Override diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java index 6eb2eb5089..923042d88c 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java @@ -85,7 +85,7 @@ public void run() { listLock.unlock(); } // Now try to evict with locked buffer still in the list. - mm.reserveMemory(1, false); + mm.reserveMemory(1, false, null); assertSame(buffer2, et.evicted.get(0)); unlock(lrfu, buffer1); } @@ -237,7 +237,7 @@ public void testDeadlockResolution() { // Lock the lowest priority buffer; try to evict - we'll evict some other buffer. LlapDataBuffer locked = inserted.get(0); lock(lrfu, locked); - mm.reserveMemory(1, false); + mm.reserveMemory(1, false, null); LlapDataBuffer evicted = et.evicted.get(0); assertNotNull(evicted); assertTrue(evicted.isInvalid()); @@ -248,7 +248,7 @@ public void testDeadlockResolution() { // Buffers in test are fakes not linked to cache; notify cache policy explicitly. public boolean cache(LowLevelCacheMemoryManager mm, LowLevelLrfuCachePolicy lrfu, EvictionTracker et, LlapDataBuffer buffer) { - if (mm != null && !mm.reserveMemory(1, false)) { + if (mm != null && !mm.reserveMemory(1, false, null)) { return false; } buffer.incRef(); @@ -337,7 +337,7 @@ private void testHeapSize(int heapSize) { lock(lrfu, buf); } assertEquals(heapSize, m.cacheUsed.get()); - assertFalse(mm.reserveMemory(1, false)); + assertFalse(mm.reserveMemory(1, false, null)); if (!et.evicted.isEmpty()) { assertTrue("Got " + et.evicted.get(0), et.evicted.isEmpty()); } @@ -362,13 +362,13 @@ private void verifyOrder(LowLevelCacheMemoryManager mm, LowLevelLrfuCachePolicy // Evict all blocks. et.evicted.clear(); for (int i = 0; i < inserted.size(); ++i) { - assertTrue(mm.reserveMemory(1, false)); + assertTrue(mm.reserveMemory(1, false, null)); if (cacheUsed != null) { assertEquals(inserted.size(), cacheUsed.get()); } } // The map should now be empty. - assertFalse(mm.reserveMemory(1, false)); + assertFalse(mm.reserveMemory(1, false, null)); if (cacheUsed != null) { assertEquals(inserted.size(), cacheUsed.get()); } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java index df20f20c8f..aa9d6ed970 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java @@ -21,10 +21,14 @@ import java.nio.ByteBuffer; import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hive.common.io.DataCache; import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.hadoop.hive.common.io.DiskRangeList; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.io.metadata.MetadataCache; import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapBufferOrBuffers; @@ -75,8 +79,11 @@ public void debugDumpShort(StringBuilder sb) { } private static class DummyMemoryManager implements MemoryManager { + private int allocs; + @Override - public void reserveMemory(long memoryToReserve) { + public void reserveMemory(long memoryToReserve, AtomicBoolean isStopped) { + ++allocs; } @Override @@ -102,11 +109,11 @@ public void testBuffers() throws Exception { ByteBuffer smallBuffer = ByteBuffer.allocate(MAX_ALLOC - 1); rdm.nextBytes(smallBuffer.array()); - LlapBufferOrBuffers result = cache.putFileMetadata(fileKey1, smallBuffer); + LlapBufferOrBuffers result = cache.putFileMetadata(fileKey1, smallBuffer, null, null); cache.decRefBuffer(result); ByteBuffer cacheBuf = result.getSingleBuffer().getByteBufferDup(); assertEquals(smallBuffer, cacheBuf); - result = cache.putFileMetadata(fileKey1, smallBuffer); + result = cache.putFileMetadata(fileKey1, smallBuffer, null, null); cache.decRefBuffer(result); cacheBuf = result.getSingleBuffer().getByteBufferDup(); assertEquals(smallBuffer, cacheBuf); @@ -120,7 +127,7 @@ public void testBuffers() throws Exception { ByteBuffer largeBuffer = ByteBuffer.allocate((int)(MAX_ALLOC * 2.5)); rdm.nextBytes(largeBuffer.array()); - result = cache.putFileMetadata(fileKey1, largeBuffer); + result = cache.putFileMetadata(fileKey1, largeBuffer, null, null); cache.decRefBuffer(result); assertNull(result.getSingleBuffer()); assertEquals(largeBuffer, extractResultBbs(result)); @@ -162,13 +169,13 @@ public void testIncompleteCbs() throws Exception { Object fileKey1 = new Object(); // Note: incomplete CBs are always an exact match. - cache.putIncompleteCbs(fileKey1, new DiskRange[] { new DiskRangeList(0, 3) }, 0); + cache.putIncompleteCbs(fileKey1, new DiskRange[] { new DiskRangeList(0, 3) }, 0, null); cp.verifyEquals(1); DiskRangeList result = cache.getIncompleteCbs( fileKey1, new DiskRangeList(0, 3), 0, gotAllData); assertTrue(gotAllData.value); verifyResult(result, INCOMPLETE, 0, 3); - cache.putIncompleteCbs(fileKey1, new DiskRange[] { new DiskRangeList(5, 6) }, 0); + cache.putIncompleteCbs(fileKey1, new DiskRange[] { new DiskRangeList(5, 6) }, 0, null); cp.verifyEquals(3); DiskRangeList ranges = new DiskRangeList(0, 3); ranges.insertAfter(new DiskRangeList(4, 6)); diff --git ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java index dcb24b8018..8370aa6230 100644 --- ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java +++ ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java @@ -284,6 +284,7 @@ public DiskRangeList createCacheChunk( int extraOffsetInChunk = 0; if (maxAlloc < chunkLength) { largeBuffers = new MemoryBuffer[largeBufCount]; + // Note: we don't use StoppableAllocator here - this is not on an IO thread. allocator.allocateMultiple(largeBuffers, maxAlloc, cache.getDataBufferFactory()); for (int i = 0; i < largeBuffers.length; ++i) { // By definition here we copy up to the limit of the buffer. @@ -301,6 +302,7 @@ public DiskRangeList createCacheChunk( largeBuffers = null; if (smallSize > 0) { smallBuffer = new MemoryBuffer[1]; + // Note: we don't use StoppableAllocator here - this is not on an IO thread. allocator.allocateMultiple(smallBuffer, smallSize, cache.getDataBufferFactory()); ByteBuffer bb = smallBuffer[0].getByteBufferRaw(); copyDiskDataToCacheBuffer(array, diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java index f6b949e51b..f3699f9ccf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.orc.StripeInformation; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; @@ -68,4 +69,6 @@ void readEncodedColumns(int stripeIx, StripeInformation stripe, void readIndexStreams(OrcIndex index, StripeInformation stripe, List streams, boolean[] included, boolean[] sargColumns) throws IOException; + + void setStopped(AtomicBoolean isStopped); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 348f9df773..2fdab5cc15 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -25,11 +25,13 @@ import java.util.Collection; import java.util.IdentityHashMap; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.Pool; import org.apache.hadoop.hive.common.Pool.PoolObjectHelper; +import org.apache.hadoop.hive.common.io.Allocator; import org.apache.hadoop.hive.common.io.DataCache; import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.hadoop.hive.common.io.DiskRangeList; @@ -146,6 +148,8 @@ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end private final TypeDescription fileSchema; private final WriterVersion version; private final String tag; + private AtomicBoolean isStopped; + private StoppableAllocator allocator; public EncodedReaderImpl(Object fileKey, List types, TypeDescription fileSchema, org.apache.orc.CompressionKind kind, WriterVersion version, @@ -162,6 +166,8 @@ public EncodedReaderImpl(Object fileKey, List types, this.bufferSize = bufferSize; this.rowIndexStride = strideRate; this.cacheWrapper = cacheWrapper; + Allocator alloc = cacheWrapper.getAllocator(); + this.allocator = alloc instanceof StoppableAllocator ? (StoppableAllocator) alloc : null; this.dataReader = dataReader; this.trace = trace; this.tag = tag; @@ -890,8 +896,7 @@ public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, lon targetBuffers[ix] = chunk.getBuffer(); ++ix; } - cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize, - cacheWrapper.getDataBufferFactory()); + allocateMultiple(targetBuffers, bufferSize); // 4. Now decompress (or copy) the data into cache buffers. for (ProcCacheChunk chunk : toDecompress) { @@ -1166,8 +1171,7 @@ private DiskRangeList preReadUncompressedStream(long baseOffset, DiskRangeList s cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these. ++ix; } - cacheWrapper.getAllocator().allocateMultiple(targetBuffers, - (int)(partCount == 1 ? streamLen : partSize), cacheWrapper.getDataBufferFactory()); + allocateMultiple(targetBuffers, (int)(partCount == 1 ? streamLen : partSize)); // 4. Now copy the data into cache buffers. ix = 0; @@ -1220,8 +1224,7 @@ private CacheChunk copyAndReplaceCandidateToNonCached( // non-cached. Since we are at the first gap, the previous stuff must be contiguous. singleAlloc[0] = null; trace.logPartialUncompressedData(partOffset, candidateEnd, true); - cacheWrapper.getAllocator().allocateMultiple( - singleAlloc, (int)(candidateEnd - partOffset), cacheWrapper.getDataBufferFactory()); + allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset)); MemoryBuffer buffer = singleAlloc[0]; cacheWrapper.reuseBuffer(buffer); ByteBuffer dest = buffer.getByteBufferRaw(); @@ -1230,12 +1233,19 @@ private CacheChunk copyAndReplaceCandidateToNonCached( return tcc; } + private void allocateMultiple(MemoryBuffer[] dest, int size) { + if (allocator != null) { + allocator.allocateMultiple(dest, size, cacheWrapper.getDataBufferFactory(), isStopped); + } else { + cacheWrapper.getAllocator().allocateMultiple(dest, size, cacheWrapper.getDataBufferFactory()); + } + } + private CacheChunk copyAndReplaceUncompressedToNonCached( BufferChunk bc, DataCache cacheWrapper, MemoryBuffer[] singleAlloc) { singleAlloc[0] = null; trace.logPartialUncompressedData(bc.getOffset(), bc.getEnd(), false); - cacheWrapper.getAllocator().allocateMultiple( - singleAlloc, bc.getLength(), cacheWrapper.getDataBufferFactory()); + allocateMultiple(singleAlloc, bc.getLength()); MemoryBuffer buffer = singleAlloc[0]; cacheWrapper.reuseBuffer(buffer); ByteBuffer dest = buffer.getByteBufferRaw(); @@ -2110,4 +2120,9 @@ private static boolean hadBadBloomFilters(TypeDescription.Category category, return false; } } + + @Override + public void setStopped(AtomicBoolean isStopped) { + this.isStopped = isStopped; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java new file mode 100644 index 0000000000..0806d78759 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc.encoded; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.hive.common.io.Allocator; +import org.apache.hadoop.hive.common.io.Allocator.BufferObjectFactory; +import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; + +public interface StoppableAllocator extends Allocator { + /** Stoppable allocate method specific to branch-2. */ + void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory, + AtomicBoolean isStopped) throws AllocatorOutOfMemoryException; +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index f64efe26f5..8c49056111 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -253,7 +253,7 @@ public void initialize( colsToInclude = ColumnProjectionUtils.getReadColumnIDs(configuration); requestedSchema = DataWritableReadSupport .getRequestedSchema(indexAccess, columnNamesList, columnTypesList, fileSchema, configuration); - + Path path = wrapPathForCache(file, cacheKey, configuration, blocks, cacheTag); this.reader = new ParquetFileReader( configuration, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns()); @@ -317,7 +317,8 @@ private ParquetMetadata readSplitFooter(JobConf configuration, final Path file, if (LOG.isInfoEnabled()) { LOG.info("Caching the footer of length " + footerLength + " for " + cacheKey); } - footerData = metadataCache.putFileMetadata(cacheKey, footerLength, stream, tag); + // Note: we don't pass in isStopped here - this is not on an IO thread. + footerData = metadataCache.putFileMetadata(cacheKey, footerLength, stream, tag, null); try { return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter); } finally { diff --git storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java index d1da7f5de8..e4aa888f67 100644 --- storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java +++ storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; import java.io.IOException; import java.io.InputStream; @@ -39,6 +40,13 @@ MemoryBufferOrBuffers putFileMetadata( @Deprecated MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer); + @Deprecated + MemoryBufferOrBuffers putFileMetadata( + Object fileKey, int length, InputStream is, String tag) throws IOException; + + @Deprecated + MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, String tag); + /** * Releases the buffer returned from getFileMetadata or putFileMetadata method. * @param buffer The buffer to release. @@ -54,8 +62,9 @@ MemoryBufferOrBuffers putFileMetadata( * @return The buffer or buffers representing the cached footer. * The caller must decref this buffer when done. */ - MemoryBufferOrBuffers putFileMetadata( - Object fileKey, int length, InputStream is, String tag) throws IOException; + MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, + String tag, AtomicBoolean isStopped); - MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, String tag); -} \ No newline at end of file + MemoryBufferOrBuffers putFileMetadata(Object fileKey, int length, + InputStream is, String tag, AtomicBoolean isStopped) throws IOException; +}