diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStore.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStore.java index df5c52d7eb..edfdac97f8 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStore.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStore.java @@ -62,6 +62,15 @@ import org.slf4j.LoggerFactory; */ public class SegmentNodeStore implements NodeStore, Observable { + private static final Closeable NOOP = new Closeable() { + + @Override + public void close() { + // This method was intentionally left blank. + } + + }; + public static class SegmentNodeStoreBuilder { private static final Logger LOG = LoggerFactory.getLogger(SegmentNodeStoreBuilder.class); @@ -177,7 +186,11 @@ public class SegmentNodeStore implements NodeStore, Observable { @Override public Closeable addObserver(Observer observer) { - return scheduler.addObserver(observer); + if (scheduler instanceof Observable) { + return ((Observable) scheduler).addObserver(observer); + } + + return NOOP; } @Override @Nonnull diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java index c587584706..07c6a5b186 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java @@ -18,17 +18,12 @@ package org.apache.jackrabbit.oak.segment.scheduler; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import static java.lang.System.currentTimeMillis; import static java.lang.Thread.currentThread; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.jackrabbit.oak.api.Type.LONG; import java.io.Closeable; import java.util.Map; import java.util.Map.Entry; -import java.util.Random; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.Semaphore; @@ -47,6 +42,7 @@ import org.apache.jackrabbit.oak.segment.SegmentOverflowException; import org.apache.jackrabbit.oak.segment.SegmentReader; import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.Observable; import org.apache.jackrabbit.oak.spi.commit.Observer; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; @@ -54,7 +50,7 @@ import org.apache.jackrabbit.oak.stats.StatisticsProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class LockBasedScheduler implements Scheduler { +public class LockBasedScheduler implements Scheduler, Observable { private static final Closeable NOOP = new Closeable() { @Override @@ -73,19 +69,18 @@ public class LockBasedScheduler implements Scheduler { @Nonnull private StatisticsProvider statsProvider = StatisticsProvider.NOOP; - + private boolean dispatchChanges = true; - private long maximumBackoff = MILLISECONDS.convert(10, SECONDS); - - private LockBasedSchedulerBuilder(@Nonnull Revisions revisions, - @Nonnull SegmentReader reader) { + private LockBasedSchedulerBuilder(@Nonnull Revisions revisions, @Nonnull SegmentReader reader) { this.revisions = revisions; this.reader = reader; } - + /** - * {@link StatisticsProvider} for collecting statistics related to SegmentStore + * {@link StatisticsProvider} for collecting statistics related to + * SegmentStore + * * @param statisticsProvider * @return this instance */ @@ -94,26 +89,20 @@ public class LockBasedScheduler implements Scheduler { this.statsProvider = checkNotNull(statisticsProvider); return this; } - + @Nonnull public LockBasedSchedulerBuilder dispatchChanges(boolean dispatchChanges) { this.dispatchChanges = dispatchChanges; return this; } - - @Nonnull - public LockBasedSchedulerBuilder withMaximumBackoff(long maximumBackoff) { - this.maximumBackoff = maximumBackoff; - return this; - } - + @Nonnull public LockBasedScheduler build() { return new LockBasedScheduler(this); } - + } - + public static LockBasedSchedulerBuilder builder(@Nonnull Revisions revisions, @Nonnull SegmentReader reader) { return new LockBasedSchedulerBuilder(checkNotNull(revisions), checkNotNull(reader)); } @@ -150,12 +139,8 @@ public class LockBasedScheduler implements Scheduler { private final AtomicReference head; private final ChangeDispatcher changeDispatcher; - - private final Random random = new Random(); - + private final SegmentNodeStoreStats stats; - - private final long maximumBackoff; public LockBasedScheduler(LockBasedSchedulerBuilder builder) { if (COMMIT_FAIR_LOCK) { @@ -166,13 +151,12 @@ public class LockBasedScheduler implements Scheduler { this.revisions = builder.revisions; this.head = new AtomicReference(reader.readHeadState(revisions)); if (builder.dispatchChanges) { - this.changeDispatcher = new ChangeDispatcher(getHeadNodeState().getChildNode(ROOT)); + this.changeDispatcher = new ChangeDispatcher(head.get().getChildNode(ROOT)); } else { this.changeDispatcher = null; } - + this.stats = new SegmentNodeStoreStats(builder.statsProvider); - this.maximumBackoff = builder.maximumBackoff; } @Override @@ -182,7 +166,7 @@ public class LockBasedScheduler implements Scheduler { } return NOOP; } - + @Override public NodeState getHeadNodeState() { if (commitSemaphore.tryAcquire()) { @@ -194,7 +178,7 @@ public class LockBasedScheduler implements Scheduler { } return head.get(); } - + /** * Refreshes the head state. Should only be called while holding a permit * from the {@link #commitSemaphore}. @@ -217,9 +201,10 @@ public class LockBasedScheduler implements Scheduler { changeDispatcher.contentChanged(root, info); } } - + @Override - public NodeState schedule(@Nonnull Commit commit, SchedulerOption... schedulingOptions) throws CommitFailedException { + public NodeState schedule(@Nonnull Commit commit, SchedulerOption... schedulingOptions) + throws CommitFailedException { boolean queued = false; try { @@ -260,95 +245,22 @@ public class LockBasedScheduler implements Scheduler { } } - private NodeState execute(Commit commit) - throws CommitFailedException, InterruptedException { + private NodeState execute(Commit commit) throws CommitFailedException, InterruptedException { // only do the merge if there are some changes to commit if (commit.hasChanges()) { - long timeout = optimisticMerge(commit); - if (timeout >= 0) { - pessimisticMerge(commit, timeout); - } - } - return head.get().getChildNode(ROOT); - } - - private long optimisticMerge(Commit commit) - throws CommitFailedException, InterruptedException { - long timeout = 1; - - // use exponential backoff in case of concurrent commits - for (long backoff = 1; backoff < maximumBackoff; backoff *= 2) { - long start = System.nanoTime(); - refreshHead(true); - SegmentNodeState state = head.get(); - if (state.hasProperty("token") - && state.getLong("timeout") >= currentTimeMillis()) { - // someone else has a pessimistic lock on the journal, - // so we should not try to commit anything yet - } else { - // use optimistic locking to update the journal - if (setHead(state, commit.apply(state), commit.info())) { - return -1; - } - } - - // someone else was faster, so wait a while and retry later - Thread.sleep(backoff, random.nextInt(1000000)); - - long stop = System.nanoTime(); - if (stop - start > timeout) { - timeout = stop - start; + SegmentNodeState before = head.get(); + SegmentNodeState after = commit.apply(before); + if (revisions.setHead(before.getRecordId(), after.getRecordId())) { + head.set(after); + contentChanged(after.getChildNode(ROOT), commit.info()); + refreshHead(true); } } - return MILLISECONDS.convert(timeout, NANOSECONDS); + return head.get().getChildNode(ROOT); } - private void pessimisticMerge(Commit commit, long timeout) - throws CommitFailedException, InterruptedException { - while (true) { - long now = currentTimeMillis(); - SegmentNodeState state = head.get(); - if (state.hasProperty("token") - && state.getLong("timeout") >= now) { - // locked by someone else, wait until unlocked or expired - Thread.sleep( - Math.min(state.getLong("timeout") - now, 1000), - random.nextInt(1000000)); - } else { - // attempt to acquire the lock - SegmentNodeBuilder builder = state.builder(); - builder.setProperty("token", UUID.randomUUID().toString()); - builder.setProperty("timeout", now + timeout); - - if (setHead(state, builder.getNodeState(), commit.info())) { - // lock acquired; rebase, apply commit hooks, and unlock - builder = commit.apply(state).builder(); - builder.removeProperty("token"); - builder.removeProperty("timeout"); - - // complete the commit - if (setHead(state, builder.getNodeState(), commit.info())) { - return; - } - } - } - } - } - - private boolean setHead(SegmentNodeState before, SegmentNodeState after, CommitInfo info) { - refreshHead(true); - if (revisions.setHead(before.getRecordId(), after.getRecordId())) { - head.set(after); - contentChanged(after.getChildNode(ROOT), info); - refreshHead(true); - return true; - } else { - return false; - } - } - @Override public String checkpoint(long lifetime, @Nonnull Map properties) { checkArgument(lifetime > 0); @@ -391,8 +303,7 @@ public class LockBasedScheduler implements Scheduler { SegmentNodeState state = head.get(); SegmentNodeBuilder builder = state.builder(); - NodeBuilder cp = builder.child("checkpoints").child( - name); + NodeBuilder cp = builder.child("checkpoints").child(name); if (cp.exists()) { cp.remove(); SegmentNodeState newState = builder.getNodeState(); diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/Scheduler.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/Scheduler.java index 63ae20b5a7..039efad8e7 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/Scheduler.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/Scheduler.java @@ -17,13 +17,11 @@ package org.apache.jackrabbit.oak.segment.scheduler; -import java.io.Closeable; import java.util.Map; import javax.annotation.Nonnull; import org.apache.jackrabbit.oak.api.CommitFailedException; -import org.apache.jackrabbit.oak.spi.commit.Observer; import org.apache.jackrabbit.oak.spi.state.NodeState; /** @@ -90,13 +88,4 @@ public interface Scheduler { * @return the latest state. */ NodeState getHeadNodeState(); - - /** - * Register a new {@code Observer}. Clients need to call {@link Closeable#close()} - * to stop getting notifications on the registered observer and to free up any resources - * associated with the registration. - * - * @return a {@code Closeable} instance. - */ - Closeable addObserver(Observer observer); } \ No newline at end of file diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/MergeTest.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/MergeTest.java index ec09d35310..405bb3182e 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/MergeTest.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/MergeTest.java @@ -20,23 +20,15 @@ package org.apache.jackrabbit.oak.segment; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.io.IOException; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.annotation.Nonnull; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.segment.memory.MemoryStore; -import org.apache.jackrabbit.oak.spi.commit.CommitHook; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; -import org.apache.jackrabbit.oak.spi.state.NodeState; import org.apache.jackrabbit.oak.spi.state.NodeStore; -import org.junit.Ignore; import org.junit.Test; public class MergeTest { @@ -86,59 +78,4 @@ public class MergeTest { assertTrue(store.getRoot().hasProperty("foo")); assertTrue(store.getRoot().hasProperty("bar")); } - - @Test - @Ignore("OAK-4122") - public void testPessimisticMerge() throws Exception { - final SegmentNodeStore store = SegmentNodeStoreBuilders.builder(new MemoryStore()).build(); - final Semaphore semaphore = new Semaphore(0); - final AtomicBoolean running = new AtomicBoolean(true); - - Thread background = new Thread() { - @Override - public void run() { - for (int i = 0; running.get(); i++) { - try { - NodeBuilder a = store.getRoot().builder(); - a.setProperty("foo", "abc" + i); - store.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY); - semaphore.release(); - } catch (CommitFailedException e) { - fail(); - } - } - } - }; - background.start(); - - // wait for the first commit - semaphore.acquire(); - - assertTrue(store.getRoot().hasProperty("foo")); - assertFalse(store.getRoot().hasProperty("bar")); - - NodeBuilder b = store.getRoot().builder(); - b.setProperty("bar", "xyz"); - - // FIXME OAK-4122 - // store.setMaximumBackoff(100); - store.merge(b, new CommitHook() { - @Override @Nonnull - public NodeState processCommit( - NodeState before, NodeState after, CommitInfo info) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - fail(); - } - return after; - } - }, CommitInfo.EMPTY); - - assertTrue(store.getRoot().hasProperty("foo")); - assertTrue(store.getRoot().hasProperty("bar")); - - running.set(false); - background.join(); - } }