diff --git a/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java index 7c6ad11c74..be1969f41f 100644 --- a/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java +++ b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java @@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -281,6 +282,50 @@ public class SegmentWriteQueueTest { } } + @Test + public void testRuntimeExceptionInSegmentConsumer() throws InterruptedException, NoSuchFieldException, IOException { + + Set added = Collections.synchronizedSet(new HashSet<>()); + AtomicBoolean doBreak = new AtomicBoolean(true); + queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> { + //simulate runtime exception that can happen while writing to the remote repository + if (doBreak.get()) { + throw new RuntimeException(); + } + + added.add(new UUID(tarEntry.getMsb(), tarEntry.getLsb())); + }); + + queue.addToQueue(tarEntry(0), EMPTY_DATA, 0, 0); + queue.addToQueue(tarEntry(1), EMPTY_DATA, 0, 0); + queue.addToQueue(tarEntry(2), EMPTY_DATA, 0, 0); + + AtomicBoolean flushFinished = new AtomicBoolean(false); + Thread flusher = new Thread(() -> { + try { + queue.flush(); + flushFinished.set(true); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + flusher.start(); + + Thread.sleep(100); + + assertFalse("Flush thread should not be finished", flushFinished.get()); + assertEquals(0, added.size()); + + //Stop throwing runtime exception + doBreak.set(false); + + //give enough time to emergency thread to wake up + Thread.sleep(1200); + + assertTrue("Segment queue should be empty", flushFinished.get()); + assertEquals(3, added.size()); + } + private static RemoteSegmentArchiveEntry tarEntry(long i) { return new RemoteSegmentArchiveEntry(0, i, 0, 0, 0, 0, false); }