diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java index 86a7dbdf74..c297619345 100644 --- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java @@ -168,6 +168,7 @@ public class SegmentWriteQueue implements Closeable { throw new IOException("Can't add segment to the queue"); } } catch (InterruptedException e) { + segmentsByUUID.remove(action.getUuid()); throw new IOException(e); } finally { flushLock.readLock().unlock(); diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java index 54c8329e04..f5ff9fb57c 100644 --- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java @@ -19,22 +19,19 @@ package org.apache.jackrabbit.oak.segment.azure.queue; import org.apache.jackrabbit.oak.segment.azure.AzureSegmentArchiveEntry; import org.junit.After; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.internal.util.reflection.FieldSetter; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; +import java.util.*; +import java.util.concurrent.BlockingDeque; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; public class SegmentWriteQueueTest { @@ -42,11 +39,66 @@ public class SegmentWriteQueueTest { private SegmentWriteQueue queue; + private SegmentWriteQueue queueBlocked; + @After public void shutdown() throws IOException { if (queue != null) { queue.close(); } + + if (queueBlocked != null) { + queueBlocked.close(); + } + } + + + @Test + public void testThreadInterruptedWhileAddigToQueue() throws IOException, InterruptedException, NoSuchFieldException { + + Set added = Collections.synchronizedSet(new HashSet<>()); + Semaphore semaphore = new Semaphore(0); + + + BlockingDeque queue = Mockito.mock(BlockingDeque.class); + + queueBlocked = new SegmentWriteQueue((tarEntry, data, offset, size) -> { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + } + added.add(new UUID(tarEntry.getMsb(), tarEntry.getLsb())); + }); + + FieldSetter.setField(queueBlocked, queueBlocked.getClass().getDeclaredField("queue"), queue); + Mockito.when(queue.offer(any(SegmentWriteAction.class), anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException()); + + try { + queueBlocked.addToQueue(tarEntry(0), EMPTY_DATA, 0, 0); + fail("IOException should have been thrown"); + } catch (IOException e) { + assertEquals(e.getCause().getClass(), InterruptedException.class); + } + + semaphore.release(Integer.MAX_VALUE); + + AtomicBoolean flushFinished = new AtomicBoolean(false); + Set addedAfterFlush = new HashSet<>(); + Thread flusher = new Thread(() -> { + try { + queueBlocked.flush(); + flushFinished.set(true); + addedAfterFlush.addAll(added); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + flusher.start(); + + Thread.sleep(1000); + + assertEquals("Flush thread should have been completed till now", Thread.State.TERMINATED, flusher.getState()); + assertTrue("Segment queue is empty", flushFinished.get()); } @Test