diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java index 302918aadfb54a024ced8ddbd46153cad1ab8baf..af9243a893fb673dc195205d039d2f2dc4d75b26 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.llap.cache; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; @@ -44,9 +45,10 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; +import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator; public final class BuddyAllocator - implements EvictionAwareAllocator, BuddyAllocatorMXBean, LlapOomDebugDump { + implements EvictionAwareAllocator, StoppableAllocator, BuddyAllocatorMXBean, LlapOomDebugDump { private final Arena[] arenas; private final AtomicInteger allocatedArenas = new AtomicInteger(0); @@ -183,10 +185,16 @@ public BuddyAllocator(boolean isDirectVal, boolean isMappedVal, int minAllocVal, metrics.incrAllocatedArena(); } - // TODO: would it make sense to return buffers asynchronously? + @Override public void allocateMultiple(MemoryBuffer[] dest, int size) throws AllocatorOutOfMemoryException { + allocateMultiple(dest, size, null); + } + + @Override + public void allocateMultiple(MemoryBuffer[] dest, int size, AtomicBoolean isStopped) + throws AllocatorOutOfMemoryException { assert size > 0 : "size is " + size; if (size > maxAllocation) { throw new RuntimeException("Trying to allocate " + size + "; max is " + maxAllocation); @@ -197,7 +205,7 @@ public void allocateMultiple(MemoryBuffer[] dest, int size) int allocLog2 = freeListIx + minAllocLog2; int allocationSize = 1 << allocLog2; // TODO: reserving the entire thing is not ideal before we alloc anything. Interleave? - memoryManager.reserveMemory(dest.length << allocLog2); + memoryManager.reserveMemory(dest.length << allocLog2, isStopped); int destAllocIx = 0; for (int i = 0; i < dest.length; ++i) { if (dest[i] != null) continue; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java index e331f1bdfdce294bd6c50984308a654a4ad451a3..e30acb02b836c39683762c7356387f194e007d8c 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.llap.cache; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; @@ -53,18 +54,26 @@ public LowLevelCacheMemoryManager( } } + public static class ReserveFailedException extends RuntimeException { + private static final long serialVersionUID = 1L; + public ReserveFailedException(AtomicBoolean isStopped) { + super("Cannot reserve memory" + + (Thread.currentThread().isInterrupted() ? "; thread interrupted" : "") + + ((isStopped != null && isStopped.get()) ? "; thread stopped" : "")); + } + } @Override - public void reserveMemory(final long memoryToReserve) { - boolean result = reserveMemory(memoryToReserve, true); + public void reserveMemory(final long memoryToReserve, AtomicBoolean isStopped) { + boolean result = reserveMemory(memoryToReserve, true, isStopped); if (result) return; // Can only happen if there's no evictor, or if thread is interrupted. - throw new RuntimeException("Cannot reserve memory" - + (Thread.currentThread().isInterrupted() ? "; thread interrupted" : "")); + throw new ReserveFailedException(isStopped); } @VisibleForTesting - public boolean reserveMemory(final long memoryToReserve, boolean waitForEviction) { + public boolean reserveMemory(final long memoryToReserve, + boolean waitForEviction, AtomicBoolean isStopped) { // TODO: if this cannot evict enough, it will spin infinitely. Terminate at some point? int badCallCount = 0; long evictedTotalMetric = 0, reservedTotalMetric = 0, remainingToReserve = memoryToReserve; @@ -100,6 +109,10 @@ public boolean reserveMemory(final long memoryToReserve, boolean waitForEviction result = false; break; } + if (isStopped != null && isStopped.get()) { + result = false; + break; + } continue; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java index 0f4d3c01d7151d1994b5c7e5c7580391af161bca..e1133cdaf7600cb58e205837024815ada98e8bb5 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java @@ -18,10 +18,12 @@ package org.apache.hadoop.hive.llap.cache; +import java.util.concurrent.atomic.AtomicBoolean; + public interface MemoryManager extends LlapOomDebugDump { void releaseMemory(long memUsage); void updateMaxSize(long maxSize); /** TODO: temporary method until we get a better allocator. */ long forceReservedMemory(int allocationSize, int count); - void reserveMemory(long memoryToReserve); + void reserveMemory(long memoryToReserve, AtomicBoolean isStopped); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 0fd81397a2a546e0a00776eb6b123bed32031afb..655ce836d95bd79de4c0175408368b43cd4d0ed3 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.apache.orc.TypeDescription; @@ -160,7 +161,7 @@ public void resetBeforeOffer(OrcEncodedColumnBatch t) { * Contains only stripes that are read, and only columns included. null => read all RGs. */ private boolean[][] stripeRgs; - private volatile boolean isStopped = false; + private AtomicBoolean isStopped = new AtomicBoolean(false); @SuppressWarnings("unused") private volatile boolean isPaused = false; @@ -226,7 +227,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff @Override public void stop() { LOG.debug("Encoded reader is being stopped"); - isStopped = true; + isStopped.set(true); } @Override @@ -330,6 +331,7 @@ protected Void performDataRead() throws IOException { DataWrapperForOrc dw = new DataWrapperForOrc(); stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY, trace); stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled()); + stripeReader.setStopped(isStopped); } catch (Throwable t) { handleReaderError(startTime, t); return null; @@ -383,7 +385,8 @@ protected Void performDataRead() throws IOException { orcReader.getSchema(), orcReader.getWriterVersion()); counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTimeHdfs); if (hasFileId && metadataCache != null) { - OrcStripeMetadata newMetadata = metadataCache.putStripeMetadata(stripeMetadata); + OrcStripeMetadata newMetadata = metadataCache.putStripeMetadata( + stripeMetadata, isStopped); isFoundInCache = newMetadata != stripeMetadata; // May be cached concurrently. stripeMetadata = newMetadata; if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) { @@ -510,7 +513,7 @@ private void validateFileMetadata() throws IOException { } private boolean processStop() { - if (!isStopped) return false; + if (!isStopped.get()) return false; LOG.info("Encoded data reader is stopping"); tracePool.offer(trace); cleanupReaders(); @@ -620,7 +623,7 @@ private OrcFileMetadata getOrReadFileMetadata() throws IOException { // We assume this call doesn't touch HDFS because everything is already read; don't add time. metadata = new OrcFileMetadata(fileKey, orcReader); if (fileKey == null || metadataCache == null) return metadata; - return metadataCache.putFileMetadata(metadata); + return metadataCache.putFileMetadata(metadata, isStopped); } /** @@ -649,7 +652,7 @@ private OrcFileMetadata getOrReadFileMetadata() throws IOException { orcReader.getWriterVersion()); counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); if (hasFileId && metadataCache != null) { - value = metadataCache.putStripeMetadata(value); + value = metadataCache.putStripeMetadata(value, isStopped); if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) { LlapIoImpl.ORC_LOGGER.trace("Caching stripe {} metadata with includes: {}", stripeKey.stripeIx, DebugUtils.toString(globalInc)); @@ -862,7 +865,7 @@ public DiskRangeList getFileData(Object fileKey, DiskRangeList range, return lowLevelCache.putFileData( fileKey, ranges, data, baseOffset, Priority.NORMAL, counters); } else if (metadataCache != null) { - metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset); + metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset, isStopped); } return null; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java index a088e27c82d2a856764b0b48f13361a75a281053..35d617874776269537f4363cf6f7c8ff329a2e4c 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java @@ -29,11 +29,13 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.Pool.PoolObjectHelper; +import org.apache.hadoop.hive.common.io.Allocator; import org.apache.hadoop.hive.common.io.DataCache.BooleanRef; import org.apache.hadoop.hive.common.io.DiskRangeList; import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory; @@ -66,6 +68,7 @@ import org.apache.hadoop.hive.ql.io.orc.Writer; import org.apache.hadoop.hive.ql.io.orc.encoded.CacheChunk; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; +import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; @@ -159,7 +162,7 @@ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end private final Object fileKey; private final FileSystem fs; - private volatile boolean isStopped = false; + private AtomicBoolean isStopped = new AtomicBoolean(false); private final Deserializer sourceSerDe; private final InputFormat sourceInputFormat; private final Reporter reporter; @@ -240,7 +243,7 @@ private static int determineAllocSize(BufferUsageManager bufferManager, Configur @Override public void stop() { LlapIoImpl.LOG.debug("Encoded reader is being stopped"); - isStopped = true; + isStopped.set(true); } @Override @@ -338,14 +341,16 @@ public String toCoordinateString() { private final Map streams = new HashMap<>(); private final Map> colStreams = new HashMap<>(); private final boolean doesSourceHaveIncludes; + private final AtomicBoolean isStopped; public CacheWriter(BufferUsageManager bufferManager, List columnIds, - boolean[] writerIncludes, boolean doesSourceHaveIncludes) { + boolean[] writerIncludes, boolean doesSourceHaveIncludes, AtomicBoolean isStopped) { this.bufferManager = bufferManager; assert writerIncludes != null; // Taken care of on higher level. this.writerIncludes = writerIncludes; this.doesSourceHaveIncludes = doesSourceHaveIncludes; this.columnIds = columnIds; + this.isStopped = isStopped; startStripe(); } @@ -433,7 +438,7 @@ public OutputReceiver createDataStream(StreamName name) throws IOException { if (LlapIoImpl.LOG.isTraceEnabled()) { LlapIoImpl.LOG.trace("Creating cache receiver for " + name); } - CacheOutputReceiver cor = new CacheOutputReceiver(bufferManager, name); + CacheOutputReceiver cor = new CacheOutputReceiver(bufferManager, name, isStopped); or = cor; List list = colStreams.get(name.getColumn()); if (list == null) { @@ -567,10 +572,16 @@ public void setCurrentStripeOffsets(long currentKnownTornStart, private List buffers = null; private int lastBufferPos = -1; private boolean suppressed = false; + private final AtomicBoolean isStopped; + private final StoppableAllocator allocator; - public CacheOutputReceiver(BufferUsageManager bufferManager, StreamName name) { + public CacheOutputReceiver( + BufferUsageManager bufferManager, StreamName name, AtomicBoolean isStopped) { this.bufferManager = bufferManager; + Allocator alloc = bufferManager.getAllocator(); + this.allocator = alloc instanceof StoppableAllocator ? (StoppableAllocator) alloc : null; this.name = name; + this.isStopped = isStopped; } public void clear() { @@ -585,6 +596,15 @@ public void suppress() { lastBufferPos = -1; } + private void allocateMultiple(MemoryBuffer[] dest, int size) { + if (allocator != null) { + allocator.allocateMultiple(dest, size, isStopped); + } else { + bufferManager.getAllocator().allocateMultiple(dest, size); + } + } + + @Override public void output(ByteBuffer buffer) throws IOException { // TODO: avoid put() by working directly in OutStream? @@ -608,7 +628,7 @@ public void output(ByteBuffer buffer) throws IOException { boolean isNewBuffer = (lastBufferPos == -1); if (isNewBuffer) { MemoryBuffer[] dest = new MemoryBuffer[1]; - bufferManager.getAllocator().allocateMultiple(dest, size); + allocateMultiple(dest, size); LlapDataBuffer newBuffer = (LlapDataBuffer)dest[0]; bb = newBuffer.getByteBufferRaw(); lastBufferPos = bb.position(); @@ -1384,7 +1404,7 @@ public void startReadSplitFromFile( // TODO: move this into ctor? EW would need to create CacheWriter then List cwColIds = writer.isOnlyWritingIncludedColumns() ? splitColumnIds : columnIds; writer.init(new CacheWriter(bufferManager, cwColIds, splitIncludes, - writer.isOnlyWritingIncludedColumns()), daemonConf, split.getPath()); + writer.isOnlyWritingIncludedColumns(), isStopped), daemonConf, split.getPath()); if (writer instanceof VectorDeserializeOrcWriter) { VectorDeserializeOrcWriter asyncWriter = (VectorDeserializeOrcWriter)writer; asyncWriter.startAsync(new AsyncCacheDataCallback()); @@ -1640,7 +1660,7 @@ private void recordReaderTime(long startTime) { } private boolean processStop() { - if (!isStopped) return false; + if (!isStopped.get()) return false; LlapIoImpl.LOG.info("SerDe-based data reader is stopping"); cleanup(true); return true; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java index 601b622b4967b9192afdbf8e3cde1ef3492c875b..6c81e5bd0b43c0ba5aeb942bab4803b58e54963f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.hadoop.hive.common.io.DiskRangeList; @@ -49,17 +50,17 @@ public OrcMetadataCache(MemoryManager memoryManager, LowLevelCachePolicy policy, ? new ConcurrentHashMap() : null; } - public OrcFileMetadata putFileMetadata(OrcFileMetadata metaData) { + public OrcFileMetadata putFileMetadata(OrcFileMetadata metaData, AtomicBoolean isStopped) { long memUsage = metaData.getMemoryUsage(); - memoryManager.reserveMemory(memUsage); + memoryManager.reserveMemory(memUsage, isStopped); OrcFileMetadata val = metadata.putIfAbsent(metaData.getFileKey(), metaData); // See OrcFileMetadata; it is always unlocked, so we just "touch" it here to simulate use. return touchOnPut(metaData, val, memUsage); } - public OrcStripeMetadata putStripeMetadata(OrcStripeMetadata metaData) { + public OrcStripeMetadata putStripeMetadata(OrcStripeMetadata metaData, AtomicBoolean isStopped) { long memUsage = metaData.getMemoryUsage(); - memoryManager.reserveMemory(memUsage); + memoryManager.reserveMemory(memUsage, isStopped); OrcStripeMetadata val = stripeMetadata.putIfAbsent(metaData.getKey(), metaData); // See OrcStripeMetadata; it is always unlocked, so we just "touch" it here to simulate use. return touchOnPut(metaData, val, memUsage); @@ -78,7 +79,8 @@ public OrcStripeMetadata putStripeMetadata(OrcStripeMetadata metaData) { } - public void putIncompleteCbs(Object fileKey, DiskRange[] ranges, long baseOffset) { + public void putIncompleteCbs( + Object fileKey, DiskRange[] ranges, long baseOffset, AtomicBoolean isStopped) { if (estimateErrors == null) return; OrcFileEstimateErrors errorData = estimateErrors.get(fileKey); boolean isNew = false; @@ -90,7 +92,7 @@ public void putIncompleteCbs(Object fileKey, DiskRange[] ranges, long baseOffset errorData.addError(range.getOffset(), range.getLength(), baseOffset); } long memUsage = errorData.estimateMemoryUsage(); - memoryManager.reserveMemory(memUsage); + memoryManager.reserveMemory(memUsage, isStopped); OrcFileEstimateErrors old = estimateErrors.putIfAbsent(fileKey, errorData); if (old != null) { errorData = old; diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java index a6080e63fa90ce09a17cd245729174183a5e9769..390b34b97afb71309b15f87885e2de9a40f9e37a 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hive.common.io.Allocator.AllocatorOutOfMemoryException; import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; @@ -58,7 +59,7 @@ public TestBuddyAllocator(boolean direct, boolean mmap) { private static class DummyMemoryManager implements MemoryManager { @Override - public void reserveMemory(long memoryToReserve) { + public void reserveMemory(long memoryToReserve, AtomicBoolean isStopped) { } @Override diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java index 0cce624682cd5673d372e5c2098c7b0b9a343d79..210cbb006ef2ca0e9cc036bfffde618cafba16a4 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java @@ -86,7 +86,7 @@ public void run() { listLock.unlock(); } // Now try to evict with locked buffer still in the list. - mm.reserveMemory(1, false); + mm.reserveMemory(1, false, null); assertSame(buffer2, et.evicted.get(0)); unlock(lrfu, buffer1); } @@ -198,7 +198,7 @@ public void testDeadlockResolution() { // Lock the lowest priority buffer; try to evict - we'll evict some other buffer. LlapDataBuffer locked = inserted.get(0); lock(lrfu, locked); - mm.reserveMemory(1, false); + mm.reserveMemory(1, false, null); LlapDataBuffer evicted = et.evicted.get(0); assertNotNull(evicted); assertTrue(evicted.isInvalid()); @@ -264,7 +264,7 @@ private LlapDataBuffer cacheSizeTwoFake(EvictionTracker et, LowLevelLrfuCachePol // Buffers in test are fakes not linked to cache; notify cache policy explicitly. public boolean cache(LowLevelCacheMemoryManager mm, LowLevelLrfuCachePolicy lrfu, EvictionTracker et, LlapDataBuffer buffer) { - if (mm != null && !mm.reserveMemory(1, false)) { + if (mm != null && !mm.reserveMemory(1, false, null)) { return false; } buffer.incRef(); @@ -353,7 +353,7 @@ private void testHeapSize(int heapSize) { lock(lrfu, buf); } assertEquals(heapSize, m.cacheUsed.get()); - assertFalse(mm.reserveMemory(1, false)); + assertFalse(mm.reserveMemory(1, false, null)); if (!et.evicted.isEmpty()) { assertTrue("Got " + et.evicted.get(0), et.evicted.isEmpty()); } @@ -378,13 +378,13 @@ private void verifyOrder(LowLevelCacheMemoryManager mm, LowLevelLrfuCachePolicy // Evict all blocks. et.evicted.clear(); for (int i = 0; i < inserted.size(); ++i) { - assertTrue(mm.reserveMemory(1, false)); + assertTrue(mm.reserveMemory(1, false, null)); if (cacheUsed != null) { assertEquals(inserted.size(), cacheUsed.get()); } } // The map should now be empty. - assertFalse(mm.reserveMemory(1, false)); + assertFalse(mm.reserveMemory(1, false, null)); if (cacheUsed != null) { assertEquals(inserted.size(), cacheUsed.get()); } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java index 3059382942a0faaaa4f920c9e12a880e4cca4797..1d5954e85964715f5dfb1817a581325b87c2f6e8 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java @@ -19,6 +19,8 @@ import static org.junit.Assert.*; +import java.util.concurrent.atomic.AtomicBoolean; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; @@ -76,7 +78,7 @@ public void debugDumpShort(StringBuilder sb) { int allocs = 0; @Override - public void reserveMemory(long memoryToReserve) { + public void reserveMemory(long memoryToReserve, AtomicBoolean isStopped) { ++allocs; } @@ -110,31 +112,31 @@ public void testGetPut() throws Exception { DummyCachePolicy cp = new DummyCachePolicy(); OrcMetadataCache cache = new OrcMetadataCache(mm, cp, false); OrcFileMetadata ofm1 = OrcFileMetadata.createDummy(1), ofm2 = OrcFileMetadata.createDummy(2); - assertSame(ofm1, cache.putFileMetadata(ofm1)); + assertSame(ofm1, cache.putFileMetadata(ofm1, null)); assertEquals(1, mm.allocs); cp.verifyEquals(1); - assertSame(ofm2, cache.putFileMetadata(ofm2)); + assertSame(ofm2, cache.putFileMetadata(ofm2, null)); assertEquals(2, mm.allocs); cp.verifyEquals(2); assertSame(ofm1, cache.getFileMetadata(1)); assertSame(ofm2, cache.getFileMetadata(2)); cp.verifyEquals(4); OrcFileMetadata ofm3 = OrcFileMetadata.createDummy(1); - assertSame(ofm1, cache.putFileMetadata(ofm3)); + assertSame(ofm1, cache.putFileMetadata(ofm3, null)); assertEquals(2, mm.allocs); cp.verifyEquals(5); assertSame(ofm1, cache.getFileMetadata(1)); cp.verifyEquals(6); OrcStripeMetadata osm1 = OrcStripeMetadata.createDummy(1), osm2 = OrcStripeMetadata.createDummy(2); - assertSame(osm1, cache.putStripeMetadata(osm1)); + assertSame(osm1, cache.putStripeMetadata(osm1, null)); assertEquals(3, mm.allocs); - assertSame(osm2, cache.putStripeMetadata(osm2)); + assertSame(osm2, cache.putStripeMetadata(osm2, null)); assertEquals(4, mm.allocs); assertSame(osm1, cache.getStripeMetadata(osm1.getKey())); assertSame(osm2, cache.getStripeMetadata(osm2.getKey())); OrcStripeMetadata osm3 = OrcStripeMetadata.createDummy(1); - assertSame(osm1, cache.putStripeMetadata(osm3)); + assertSame(osm1, cache.putStripeMetadata(osm3, null)); assertEquals(4, mm.allocs); assertSame(osm1, cache.getStripeMetadata(osm3.getKey())); cp.verifyEquals(12); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java index 7540e72b5392fe6a5d67410c68f26a5d62435541..4324c860b17706fc49751997e53437b0a3d91b38 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.orc.StripeInformation; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; @@ -54,4 +55,6 @@ void readEncodedColumns(int stripeIx, StripeInformation stripe, * to just checking the constant in the first place. */ void setTracing(boolean isEnabled); + + void setStopped(AtomicBoolean isStopped); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 3ef03ea35a0ca48119efa3e5e4a2d63ee6217b6d..5e718c3763f0ddb26e486be32de93dfae3172ba2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -24,11 +24,13 @@ import java.util.Collection; import java.util.IdentityHashMap; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.Pool; import org.apache.hadoop.hive.common.Pool.PoolObjectHelper; +import org.apache.hadoop.hive.common.io.Allocator; import org.apache.hadoop.hive.common.io.DataCache; import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.hadoop.hive.common.io.DiskRangeList; @@ -126,6 +128,8 @@ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end private final DataCache cacheWrapper; private boolean isTracingEnabled; private final IoTrace trace; + private AtomicBoolean isStopped; + private StoppableAllocator allocator; public EncodedReaderImpl(Object fileKey, List types, CompressionCodec codec, int bufferSize, long strideRate, DataCache cacheWrapper, DataReader dataReader, @@ -136,6 +140,8 @@ public EncodedReaderImpl(Object fileKey, List types, CompressionC this.bufferSize = bufferSize; this.rowIndexStride = strideRate; this.cacheWrapper = cacheWrapper; + Allocator alloc = cacheWrapper.getAllocator(); + this.allocator = alloc instanceof StoppableAllocator ? (StoppableAllocator) alloc : null; this.dataReader = dataReader; this.trace = trace; if (POOLS != null) return; @@ -805,7 +811,7 @@ public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, lon targetBuffers[ix] = chunk.getBuffer(); ++ix; } - cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize); + allocateMultiple(targetBuffers, bufferSize); // 4. Now decompress (or copy) the data into cache buffers. for (ProcCacheChunk chunk : toDecompress) { @@ -1067,8 +1073,7 @@ private DiskRangeList preReadUncompressedStream(long baseOffset, DiskRangeList s cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these. ++ix; } - cacheWrapper.getAllocator().allocateMultiple( - targetBuffers, (int)(partCount == 1 ? streamLen : partSize)); + allocateMultiple(targetBuffers, (int)(partCount == 1 ? streamLen : partSize)); // 4. Now copy the data into cache buffers. ix = 0; @@ -1120,7 +1125,7 @@ private CacheChunk copyAndReplaceCandidateToNonCached( // non-cached. Since we are at the first gap, the previous stuff must be contiguous. singleAlloc[0] = null; trace.logPartialUncompressedData(partOffset, candidateEnd, true); - cacheWrapper.getAllocator().allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset)); + allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset)); MemoryBuffer buffer = singleAlloc[0]; cacheWrapper.reuseBuffer(buffer); ByteBuffer dest = buffer.getByteBufferRaw(); @@ -1130,11 +1135,19 @@ private CacheChunk copyAndReplaceCandidateToNonCached( return tcc; } + private void allocateMultiple(MemoryBuffer[] dest, int size) { + if (allocator != null) { + allocator.allocateMultiple(dest, size, isStopped); + } else { + cacheWrapper.getAllocator().allocateMultiple(dest, size); + } + } + private CacheChunk copyAndReplaceUncompressedToNonCached( BufferChunk bc, DataCache cacheWrapper, MemoryBuffer[] singleAlloc) { singleAlloc[0] = null; trace.logPartialUncompressedData(bc.getOffset(), bc.getEnd(), false); - cacheWrapper.getAllocator().allocateMultiple(singleAlloc, bc.getLength()); + allocateMultiple(singleAlloc, bc.getLength()); MemoryBuffer buffer = singleAlloc[0]; cacheWrapper.reuseBuffer(buffer); ByteBuffer dest = buffer.getByteBufferRaw(); @@ -1706,4 +1719,9 @@ public void resetBeforeOffer(ColumnStreamData t) { }); } } + + @Override + public void setStopped(AtomicBoolean isStopped) { + this.isStopped = isStopped; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java new file mode 100644 index 0000000000000000000000000000000000000000..2172bd209678a184de10189d3eac00365799de1d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc.encoded; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.hive.common.io.Allocator; +import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; + +public interface StoppableAllocator extends Allocator { + /** Stoppable allocate method specific to branch-2. */ + void allocateMultiple(MemoryBuffer[] dest, int size, AtomicBoolean isStopped) + throws AllocatorOutOfMemoryException; +} \ No newline at end of file