Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (revision 1712586) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (working copy) @@ -93,6 +93,13 @@ private long maximumBackoff = MILLISECONDS.convert(10, SECONDS); + /** + * Sets the number of seconds to wait for the attempt to grab the lock to + * create a checkpoint + */ + private int checkpointsLockWaitTime = Integer.getInteger( + "oak.checkpoints.lockWaitTime", 10); + @Nonnull public static SegmentNodeStoreBuilder newSegmentNodeStore( @Nonnull SegmentStore store) { @@ -279,57 +286,68 @@ checkArgument(lifetime > 0); checkNotNull(properties); String name = UUID.randomUUID().toString(); - long now = System.currentTimeMillis(); - - // try 5 times - for (int i = 0; i < 5; i++) { - if (commitSemaphore.tryAcquire()) { - try { - refreshHead(); + try { + CPCreator cpc = new CPCreator(name, lifetime, properties); + if (locked(cpc, checkpointsLockWaitTime, TimeUnit.SECONDS)) { + return name; + } + log.warn("Failed to create checkpoint {} in {} seconds.", name, + checkpointsLockWaitTime); + } catch (Exception e) { + log.error("Failed to create checkpoint {}.", name, e); + } + return name; + } - SegmentNodeState state = head.get(); - SegmentNodeBuilder builder = state.builder(); + private final class CPCreator implements Callable { - NodeBuilder checkpoints = builder.child("checkpoints"); - for (String n : checkpoints.getChildNodeNames()) { - NodeBuilder cp = checkpoints.getChildNode(n); - PropertyState ts = cp.getProperty("timestamp"); - if (ts == null - || ts.getType() != Type.LONG - || now > ts.getValue(Type.LONG)) { - cp.remove(); - } - } + private final String name; + private final long lifetime; + private final Map properties; - NodeBuilder cp = checkpoints.child(name); - cp.setProperty("timestamp", now + lifetime); - cp.setProperty("created", now); + CPCreator(String name, long lifetime, Map properties) { + this.name = name; + this.lifetime = lifetime; + this.properties = properties; + } - NodeBuilder props = cp.setChildNode("properties"); - for (Entry p : properties.entrySet()) { - props.setProperty(p.getKey(), p.getValue()); - } + @Override + public Boolean call() { + long now = System.currentTimeMillis(); - cp.setChildNode(ROOT, state.getChildNode(ROOT)); + refreshHead(); - SegmentNodeState newState = builder.getNodeState(); - if (store.setHead(state, newState)) { - refreshHead(); - return name; - } else { - log.debug( - "Unable to update the head state for checkpoint {} ({}/5)", - new Object[] { name, i + 1 }); - } + SegmentNodeState state = head.get(); + SegmentNodeBuilder builder = state.builder(); - } finally { - commitSemaphore.release(); + NodeBuilder checkpoints = builder.child("checkpoints"); + for (String n : checkpoints.getChildNodeNames()) { + NodeBuilder cp = checkpoints.getChildNode(n); + PropertyState ts = cp.getProperty("timestamp"); + if (ts == null || ts.getType() != Type.LONG + || now > ts.getValue(Type.LONG)) { + cp.remove(); } } - } - log.debug("Failed to create checkpoint {}", name); - return name; + NodeBuilder cp = checkpoints.child(name); + cp.setProperty("timestamp", now + lifetime); + cp.setProperty("created", now); + + NodeBuilder props = cp.setChildNode("properties"); + for (Entry p : properties.entrySet()) { + props.setProperty(p.getKey(), p.getValue()); + } + cp.setChildNode(ROOT, state.getChildNode(ROOT)); + + SegmentNodeState newState = builder.getNodeState(); + if (store.setHead(state, newState)) { + refreshHead(); + return true; + } else { + return false; + } + } } @Override @Nonnull @@ -537,4 +555,12 @@ } + /** + * Sets the number of seconds to wait for the attempt to grab the lock to + * create a checkpoint + */ + void setCheckpointsLockWaitTime(int checkpointsLockWaitTime) { + this.checkpointsLockWaitTime = checkpointsLockWaitTime; + } + } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CheckpointTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CheckpointTest.java (revision 1712586) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CheckpointTest.java (working copy) @@ -16,12 +16,15 @@ */ package org.apache.jackrabbit.oak.plugins.segment; -import static junit.framework.Assert.assertFalse; -import static junit.framework.Assert.assertNotNull; -import static junit.framework.Assert.assertNull; -import static junit.framework.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import java.util.concurrent.Callable; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.plugins.segment.memory.MemoryStore; @@ -87,4 +90,95 @@ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); } + /** + * OAK-3587 test simulates a timeout while trying to create a checkpoint, + * then releases the lock and tries again + */ + @Test + public void testShortWait() throws Exception { + final SegmentNodeStore store = new SegmentNodeStore(new MemoryStore()); + store.setCheckpointsLockWaitTime(1); + + final Semaphore semaphore = new Semaphore(0); + final AtomicBoolean blocking = new AtomicBoolean(true); + + final Callable block = new Callable() { + + @Override + public Boolean call() { + while (blocking.get()) { + if (semaphore.availablePermits() == 0) { + semaphore.release(); + } + } + return true; + } + }; + + Thread background = new Thread() { + @Override + public void run() { + try { + store.locked(block); + } catch (Exception e) { + // + } + } + }; + + background.start(); + semaphore.acquire(); + + String cp0 = store.checkpoint(10); + assertNull(store.retrieve(cp0)); + + blocking.set(false); + String cp1 = store.checkpoint(10); + assertNotNull(store.retrieve(cp1)); + } + + /** + * OAK-3587 test simulates a wait less than configured + * {@code SegmentNodeStore#setCheckpointsLockWaitTime(int)} value so the + * checkpoint call must return a valid value + */ + @Test + public void testLongWait() throws Exception { + final int blockTime = 1; + final SegmentNodeStore store = new SegmentNodeStore(new MemoryStore()); + store.setCheckpointsLockWaitTime(blockTime + 1); + + final Semaphore semaphore = new Semaphore(0); + + final Callable block = new Callable() { + + @Override + public Boolean call() { + try { + semaphore.release(); + TimeUnit.SECONDS.sleep(blockTime); + } catch (InterruptedException e) { + // + } + return true; + } + }; + + Thread background = new Thread() { + @Override + public void run() { + try { + store.locked(block); + } catch (Exception e) { + // + } + } + }; + + background.start(); + semaphore.acquire(); + + String cp0 = store.checkpoint(10); + assertNotNull(store.retrieve(cp0)); + } }