diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java index 6ca988694f..47fefe39b0 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java @@ -19,20 +19,25 @@ package org.apache.jackrabbit.oak.segment.scheduler; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static java.lang.Thread.currentThread; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.jackrabbit.oak.api.Type.LONG; import java.io.Closeable; +import java.text.MessageFormat; import java.util.Map; import java.util.Map.Entry; +import java.util.Random; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.SlidingWindowReservoir; import javax.annotation.Nonnull; - import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.segment.Revisions; @@ -51,9 +56,6 @@ import org.apache.jackrabbit.oak.stats.StatisticsProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.SlidingWindowReservoir; - public class LockBasedScheduler implements Scheduler { public static class LockBasedSchedulerBuilder { @@ -122,6 +124,12 @@ public class LockBasedScheduler implements Scheduler { private static final double SCHEDULER_FETCH_COMMIT_DELAY_QUANTILE = Double .parseDouble(System.getProperty("oak.scheduler.fetch.commitDelayQuantile", "0.5")); + /** + * Maximum number of milliseconds to wait before re-attempting to update the current + * head state after a successful commit, provided a concurrent head state update happens. + */ + private static final long MAXIMUM_BACKOFF = MILLISECONDS.convert(10, SECONDS); + /** * Sets the number of seconds to wait for the attempt to grab the lock to * create a checkpoint @@ -149,6 +157,7 @@ public class LockBasedScheduler implements Scheduler { private final Histogram commitTimeHistogram = new Histogram(new SlidingWindowReservoir(1000)); + private final Random random = new Random(); public LockBasedScheduler(LockBasedSchedulerBuilder builder) { if (COMMIT_FAIR_LOCK) { @@ -244,19 +253,37 @@ public class LockBasedScheduler implements Scheduler { } } - private NodeState execute(Commit commit) throws CommitFailedException { + private NodeState execute(Commit commit) throws CommitFailedException, InterruptedException { // only do the merge if there are some changes to commit if (commit.hasChanges()) { - refreshHead(true); - SegmentNodeState before = head.get(); - SegmentNodeState after = commit.apply(before); - if (revisions.setHead(before.getRecordId(), after.getRecordId())) { - head.set(after); - contentChanged(after.getChildNode(ROOT), commit.info()); + long start = System.nanoTime(); + + for (long backoff = 1; backoff < MAXIMUM_BACKOFF; backoff *= 2) { refreshHead(true); + SegmentNodeState before = head.get(); + SegmentNodeState after = commit.apply(before); + + if (revisions.setHead(before.getRecordId(), after.getRecordId())) { + head.set(after); + contentChanged(after.getChildNode(ROOT), commit.info()); + refreshHead(true); + + return head.get().getChildNode(ROOT); + } + + int randNs = random.nextInt(1_000_000); + log.info("Scheduler detected concurrent commits. Backing off for {} ms and {} ns", backoff, randNs); + Thread.sleep(backoff, randNs); } + + long finish = System.nanoTime(); + + String message = MessageFormat.format( + "The commit could not be executed after 13 attempts. Total wait time: {} ms", + NANOSECONDS.toMillis(finish - start)); + throw new CommitFailedException("Segment", 3, message); } - + return head.get().getChildNode(ROOT); }