diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueue.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueue.java index 3758de1a2a..ff72617665 100644 --- a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueue.java +++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueue.java @@ -89,6 +89,11 @@ public class SegmentWriteQueue implements Closeable { queue.put(segment); } catch (InterruptedException e1) { log.error("Can't re-add the segment {} to the queue. It'll be dropped.", segment.getUuid(), e1); + + synchronized (segmentsByUUID) { + segmentsByUUID.remove(segment.getUuid()); + segmentsByUUID.notifyAll(); + } } } } @@ -109,7 +114,7 @@ public class SegmentWriteQueue implements Closeable { private void consume(SegmentWriteAction segment) throws SegmentConsumeException { try { segment.passTo(writer); - } catch (IOException e) { + } catch (IOException | RuntimeException e) { setBroken(true); throw new SegmentConsumeException(segment, e); } @@ -271,7 +276,7 @@ public class SegmentWriteQueue implements Closeable { private final SegmentWriteAction segment; - public SegmentConsumeException(SegmentWriteAction segment, IOException cause) { + public SegmentConsumeException(SegmentWriteAction segment, Exception cause) { super(cause); this.segment = segment; } diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/package-info.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/package-info.java new file mode 100644 index 0000000000..865efd882a --- /dev/null +++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@Internal(since = "1.0.0") +@Version("1.0.0") +package org.apache.jackrabbit.oak.segment.remote.queue; + +import org.apache.jackrabbit.oak.commons.annotations.Internal; +import org.osgi.annotation.versioning.Version; \ No newline at end of file diff --git a/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java index 7c6ad11c74..be1969f41f 100644 --- a/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java +++ b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java @@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -281,6 +282,50 @@ public class SegmentWriteQueueTest { } } + @Test + public void testRuntimeExceptionInSegmentConsumer() throws InterruptedException, NoSuchFieldException, IOException { + + Set added = Collections.synchronizedSet(new HashSet<>()); + AtomicBoolean doBreak = new AtomicBoolean(true); + queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> { + //simulate runtime exception that can happen while writing to the remote repository + if (doBreak.get()) { + throw new RuntimeException(); + } + + added.add(new UUID(tarEntry.getMsb(), tarEntry.getLsb())); + }); + + queue.addToQueue(tarEntry(0), EMPTY_DATA, 0, 0); + queue.addToQueue(tarEntry(1), EMPTY_DATA, 0, 0); + queue.addToQueue(tarEntry(2), EMPTY_DATA, 0, 0); + + AtomicBoolean flushFinished = new AtomicBoolean(false); + Thread flusher = new Thread(() -> { + try { + queue.flush(); + flushFinished.set(true); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + flusher.start(); + + Thread.sleep(100); + + assertFalse("Flush thread should not be finished", flushFinished.get()); + assertEquals(0, added.size()); + + //Stop throwing runtime exception + doBreak.set(false); + + //give enough time to emergency thread to wake up + Thread.sleep(1200); + + assertTrue("Segment queue should be empty", flushFinished.get()); + assertEquals(3, added.size()); + } + private static RemoteSegmentArchiveEntry tarEntry(long i) { return new RemoteSegmentArchiveEntry(0, i, 0, 0, 0, 0, false); }