diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java index fcfc22a712..b0989b69fd 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java @@ -320,17 +320,24 @@ public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory hasDiscardedAny = hasDiscardedAny || (ctx.resultCount > 0); destAllocIx = allocateFromDiscardResult( dest, destAllocIx, freeListIx, allocationSize, ctx); - if (destAllocIx == dest.length) return; } if (hasDiscardedAny) { discardFailed = 0; } else if (++discardFailed > MAX_DISCARD_ATTEMPTS) { + isFailed = true; + // Ensure all-or-nothing allocation. + for (int i = 0; i < destAllocIx; ++i) { + try { + deallocate(dest[i]); + } catch (Throwable t) { + LlapIoImpl.LOG.info("Failed to deallocate after a partially successful allocate: " + dest[i]); + } + } String msg = "Failed to allocate " + size + "; at " + destAllocIx + " out of " + dest.length + " (entire cache is fragmented and locked, or an internal issue)"; logOomErrorMessage(msg); - isFailed = true; throw new AllocatorOutOfMemoryException(msg); } ++attempt; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 348f9df773..d04c58b912 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -431,7 +431,7 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, trace.logStartCol(ctx.colIx); for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) { StreamContext sctx = ctx.streams[streamIx]; - ColumnStreamData cb; + ColumnStreamData cb = null; try { if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding) || index == null) { // This stream is for entire stripe and needed for every RG; uncompress once and reuse. @@ -443,7 +443,7 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, trace.logStartStripeStream(sctx.kind); sctx.stripeLevelStream = POOLS.csdPool.take(); // We will be using this for each RG while also sending RGs to processing. - // To avoid buffers being unlocked, run refcount one ahead; so each RG + // To avoid buffers being unlocked, run refcount one ahead; so each RG // processing will decref once, and the last one will unlock the buffers. sctx.stripeLevelStream.incRef(); // For stripe-level streams we don't need the extra refcount on the block. @@ -482,13 +482,18 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, sctx.bufferIter = iter = lastCached; } } - ecb.setStreamData(ctx.colIx, sctx.kind.getNumber(), cb); } catch (Exception ex) { DiskRangeList drl = toRead == null ? null : toRead.next; LOG.error("Error getting stream [" + sctx.kind + ", " + ctx.encoding + "] for" + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length + "; toRead " + RecordReaderUtils.stringifyDiskRanges(drl), ex); throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex); + } finally { + // Always add stream data to ecb; releaseEcbRefCountsOnError relies on it. + // Otherwise, we won't release consumer refcounts for a partially read stream. + if (cb != null) { + ecb.setStreamData(ctx.colIx, sctx.kind.getNumber(), cb); + } } } } @@ -670,6 +675,7 @@ private void releaseInitialRefcounts(DiskRangeList current) { if (toFree instanceof ProcCacheChunk) { ProcCacheChunk pcc = (ProcCacheChunk)toFree; if (pcc.originalData != null) { + // TODO: can this still happen? we now clean these up explicitly to avoid other issues. // This can only happen in case of failure - we read some data, but didn't decompress // it. Deallocate the buffer directly, do not decref. if (pcc.getBuffer() != null) { @@ -677,7 +683,6 @@ private void releaseInitialRefcounts(DiskRangeList current) { } continue; } - } if (!(toFree instanceof CacheChunk)) continue; CacheChunk cc = (CacheChunk)toFree; @@ -890,35 +895,69 @@ public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, lon targetBuffers[ix] = chunk.getBuffer(); ++ix; } - cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize, - cacheWrapper.getDataBufferFactory()); + boolean isAllocated = false; + try { + cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize, + cacheWrapper.getDataBufferFactory()); + isAllocated = true; + } finally { + // toDecompress/targetBuffers contents are actually already added to some structures that + // will be cleaned up on error. Remove the unallocated buffers; keep the cached buffers in. + if (!isAllocated) { + // Inefficient - this only happens during cleanup on errors. + for (MemoryBuffer buf : targetBuffers) { + csd.getCacheBuffers().remove(buf); + } + for (ProcCacheChunk chunk : toDecompress) { + chunk.buffer = null; + } + } + } // 4. Now decompress (or copy) the data into cache buffers. - for (ProcCacheChunk chunk : toDecompress) { - ByteBuffer dest = chunk.getBuffer().getByteBufferRaw(); - if (chunk.isOriginalDataCompressed) { - boolean isOk = false; - try { - decompressChunk(chunk.originalData, codec, dest); - isOk = true; - } finally { - if (!isOk) { - isCodecFailure = true; + int decompressedIx = 0; + try { + while (decompressedIx < toDecompress.size()) { + ProcCacheChunk chunk = toDecompress.get(decompressedIx); + ByteBuffer dest = chunk.getBuffer().getByteBufferRaw(); + if (chunk.isOriginalDataCompressed) { + boolean isOk = false; + try { + decompressChunk(chunk.originalData, codec, dest); + isOk = true; + } finally { + if (!isOk) { + isCodecFailure = true; + } } + } else { + copyUncompressedChunk(chunk.originalData, dest); } - } else { - copyUncompressedChunk(chunk.originalData, dest); - } - if (isTracingEnabled) { - LOG.trace("Locking " + chunk.getBuffer() + " due to reuse (after decompression)"); + if (isTracingEnabled) { + LOG.trace("Locking " + chunk.getBuffer() + " due to reuse (after decompression)"); + } + // After we set originalData to null, we incref the buffer and the cleanup would decref it. + // Note that this assumes the failure during incref means incref didn't occur. + try { + cacheWrapper.reuseBuffer(chunk.getBuffer()); + } finally { + chunk.originalData = null; + } + ++decompressedIx; } - // After we set originalData to null, we incref the buffer and the cleanup would decref it. - // Note that this assumes the failure during incref means incref didn't occur. - try { - cacheWrapper.reuseBuffer(chunk.getBuffer()); - } finally { - chunk.originalData = null; + } finally { + // This will only execute on error. Deallocate the remaining allocated buffers explicitly. + // The ones that were already incref-ed will be cleaned up with the regular cache buffers. + while (decompressedIx < toDecompress.size()) { + ProcCacheChunk chunk = toDecompress.get(decompressedIx); + csd.getCacheBuffers().remove(chunk.getBuffer()); + try { + cacheWrapper.getAllocator().deallocate(chunk.getBuffer()); + } catch (Throwable t) { + LOG.error("Ignoring the cleanup error after another error", t); + } + chunk.setBuffer(null); } } @@ -959,7 +998,7 @@ private CacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset, if (current instanceof CacheChunk) { // 2a. This is a decoded compression buffer, add as is. CacheChunk cc = (CacheChunk)current; - if (isTracingEnabled) { + if (isTracingEnabled) { // TODO# HERE unaccompanied lock LOG.trace("Locking " + cc.getBuffer() + " due to reuse"); } cacheWrapper.reuseBuffer(cc.getBuffer()); @@ -1052,7 +1091,7 @@ private CacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffse * to handle just for this case. * We could avoid copy in non-zcr case and manage the buffer that was not allocated by our * allocator. Uncompressed case is not mainline though so let's not complicate it. - * @param kind + * @param kind */ private DiskRangeList preReadUncompressedStream(long baseOffset, DiskRangeList start, long streamOffset, long streamEnd, Kind kind) throws IOException { @@ -1564,7 +1603,7 @@ private ProcCacheChunk addOneCompressionBuffer(BufferChunk current, ProcCacheChunk cc = addOneCompressionBlockByteBuffer(copy, isUncompressed, cbStartOffset, cbEndOffset, remaining, (BufferChunk)next, toDecompress, cacheBuffers, true); if (compressed.remaining() <= 0 && toRelease.remove(compressed)) { - releaseBuffer(compressed, true); // We copied the entire buffer. + releaseBuffer(compressed, true); // We copied the entire buffer. } // else there's more data to process; will be handled in next call. return cc; } @@ -2017,7 +2056,7 @@ private DiskRangeList preReadUncompressedStreams(long stripeOffset, ReadContext[ hasError = false; } finally { // At this point, everything in the list is going to have a refcount of one. Unless it - // failed between the allocation and the incref for a single item, we should be ok. + // failed between the allocation and the incref for a single item, we should be ok. if (hasError) { try { releaseInitialRefcounts(toRead.next);