Index: oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java =================================================================== --- oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java (revision 1876189) +++ oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java (working copy) @@ -168,6 +168,7 @@ throw new IOException("Can't add segment to the queue"); } } catch (InterruptedException e) { + segmentsByUUID.remove(action.getUuid()); throw new IOException(e); } finally { flushLock.readLock().unlock(); Index: oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java =================================================================== --- oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java (revision 1876189) +++ oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java (working copy) @@ -19,6 +19,8 @@ import org.apache.jackrabbit.oak.segment.azure.AzureSegmentArchiveEntry; import org.junit.After; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.internal.util.reflection.FieldSetter; import java.io.IOException; import java.io.UncheckedIOException; @@ -28,7 +30,9 @@ import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.BlockingDeque; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; @@ -35,6 +39,9 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; public class SegmentWriteQueueTest { @@ -42,14 +49,66 @@ private SegmentWriteQueue queue; + private SegmentWriteQueue queueBlocked; + @After public void shutdown() throws IOException { if (queue != null) { queue.close(); } + + if (queueBlocked != null) { + queueBlocked.close(); + } } @Test + public void testThreadInterruptedWhileAddigToQueue() throws InterruptedException, NoSuchFieldException { + + Set added = Collections.synchronizedSet(new HashSet<>()); + Semaphore semaphore = new Semaphore(0); + + + BlockingDeque queue = Mockito.mock(BlockingDeque.class); + + queueBlocked = new SegmentWriteQueue((tarEntry, data, offset, size) -> { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + } + added.add(new UUID(tarEntry.getMsb(), tarEntry.getLsb())); + }); + + FieldSetter.setField(queueBlocked, queueBlocked.getClass().getDeclaredField("queue"), queue); + Mockito.when(queue.offer(any(SegmentWriteAction.class), anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException()); + + try { + queueBlocked.addToQueue(tarEntry(0), EMPTY_DATA, 0, 0); + fail("IOException should have been thrown"); + } catch (IOException e) { + assertEquals(e.getCause().getClass(), InterruptedException.class); + } + + semaphore.release(Integer.MAX_VALUE); + + AtomicBoolean flushFinished = new AtomicBoolean(false); + Thread flusher = new Thread(() -> { + try { + queueBlocked.flush(); + flushFinished.set(true); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + flusher.start(); + + Thread.sleep(1000); + + assertEquals("Flush thread should have been completed till now", Thread.State.TERMINATED, flusher.getState()); + assertTrue("Segment queue is empty", flushFinished.get()); + } + + @Test public void testQueue() throws IOException, InterruptedException { Set added = Collections.synchronizedSet(new HashSet<>()); Semaphore semaphore = new Semaphore(0);