diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java index 54c8329e04..2c7345ddca 100644 --- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java @@ -19,22 +19,20 @@ package org.apache.jackrabbit.oak.segment.azure.queue; import org.apache.jackrabbit.oak.segment.azure.AzureSegmentArchiveEntry; import org.junit.After; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.internal.util.reflection.FieldSetter; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; +import java.util.*; +import java.util.concurrent.BlockingDeque; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; public class SegmentWriteQueueTest { @@ -42,11 +40,66 @@ public class SegmentWriteQueueTest { private SegmentWriteQueue queue; + private SegmentWriteQueue queueBlocked; + @After public void shutdown() throws IOException { if (queue != null) { queue.close(); } + + if (queueBlocked != null) { + queueBlocked.close(); + } + } + + + @Test + public void testThreadInterruptedWhileAddigToQueue() throws IOException, InterruptedException, NoSuchFieldException { + + Set added = Collections.synchronizedSet(new HashSet<>()); + Semaphore semaphore = new Semaphore(0); + + + BlockingDeque queue = Mockito.mock(BlockingDeque.class); + + queueBlocked = new SegmentWriteQueue((tarEntry, data, offset, size) -> { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + } + added.add(new UUID(tarEntry.getMsb(), tarEntry.getLsb())); + }); + + FieldSetter.setField(queueBlocked, queueBlocked.getClass().getDeclaredField("queue"), queue); + Mockito.when(queue.offer(any(SegmentWriteAction.class), anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException()); + + try { + queueBlocked.addToQueue(tarEntry(0), EMPTY_DATA, 0, 0); + } catch (IOException e) { + System.out.println(); + } + System.out.println(); + + semaphore.release(Integer.MAX_VALUE); + + AtomicBoolean flushFinished = new AtomicBoolean(false); + Set addedAfterFlush = new HashSet<>(); + Thread flusher = new Thread(() -> { + try { + queueBlocked.flush(); + flushFinished.set(true); + addedAfterFlush.addAll(added); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + flusher.start(); + + Thread.sleep(1000); + + assertEquals("Flush thread should have been completed till now", Thread.State.TERMINATED, flusher.getState()); + assertTrue("Segment queue is empty", flushFinished.get()); } @Test