diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java index 341da252c4..3344800f31 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.llap.cache; +import java.nio.channels.ClosedByInterruptException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -217,7 +218,11 @@ public BuddyAllocator(boolean isDirectVal, boolean isMappedVal, int minAllocVal, } int initCount = doPreallocate && !isMapped ? maxArenas : 1; for (int i = 0; i < initCount; ++i) { - arenas[i].init(i); + try { + arenas[i].init(i); + } catch (ClosedByInterruptException e) { + throw new RuntimeException("Failed pre-allocating buddy allocator arena. ", e); + } metrics.incrAllocatedArena(); } allocatedArenas.set(initCount); @@ -851,7 +856,7 @@ public boolean isDirectAlloc() { return isDirect; } - private ByteBuffer preallocateArenaBuffer(int arenaSize) { + private ByteBuffer preallocateArenaBuffer(int arenaSize) throws ClosedByInterruptException { if (isMapped) { RandomAccessFile rwf = null; File rf = null; @@ -863,6 +868,15 @@ private ByteBuffer preallocateArenaBuffer(int arenaSize) { // Use RW, not PRIVATE because the copy-on-write is irrelevant for a deleted file // see discussion in YARN-5551 for the memory accounting discussion return rwf.getChannel().map(MapMode.READ_WRITE, 0, arenaSize); + } catch (ClosedByInterruptException cbi) { + LlapIoImpl.LOG.warn("Interrupted while trying to allocate memory mapped arena", cbi); + // finally may not execute on thread interrupts so cleanup the arena file as it may be unmapped + IOUtils.closeQuietly(rwf); + if (rf != null) { + rf.delete(); + rf = null; + } + throw cbi; } catch (IOException ioe) { LlapIoImpl.LOG.warn("Failed trying to allocate memory mapped arena", ioe); // fail similarly when memory allocations fail @@ -892,7 +906,7 @@ private ByteBuffer preallocateArenaBuffer(int arenaSize) { private byte[] headers; // Free list indices of each unallocated block, for quick lookup. private FreeList[] freeLists; - void init(int arenaIx) { + void init(int arenaIx) throws ClosedByInterruptException { this.arenaIx = arenaIx; try { data = preallocateArenaBuffer(arenaSize); @@ -1453,13 +1467,24 @@ private int allocateWithExpand( continue; // CAS race, look again. } assert data == null; - init(arenaIx); - boolean isCommited = allocatedArenas.compareAndSet(-arenaCount - 1, arenaCount + 1); - assert isCommited; + try { + init(arenaIx); + // if init did not throw interrupt exception then allocation succeeded and so increment and commit the arena + boolean isCommited = allocatedArenas.compareAndSet(-arenaCount - 1, arenaCount + 1); + assert isCommited; + metrics.incrAllocatedArena(); + } catch (ClosedByInterruptException e) { + LlapIoImpl.LOG.info("Received interrupt during arena {} allocation.. Ignoring..", arenaIx); + // not doing the notify in finally() block as thread interruptions may not execute finally + synchronized (this) { + this.notifyAll(); + } + continue; + } + synchronized (this) { this.notifyAll(); } - metrics.incrAllocatedArena(); return allocateWithSplit(freeListIx, dest, null, ix, dest.length, size, -1); } }