diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java index 17c7cc6657..c587584706 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java @@ -356,8 +356,17 @@ public class LockBasedScheduler implements Scheduler { String name = UUID.randomUUID().toString(); try { CPCreator cpc = new CPCreator(name, lifetime, properties); - if (locked(cpc, checkpointsLockWaitTime, TimeUnit.SECONDS)) { - return name; + if (commitSemaphore.tryAcquire(checkpointsLockWaitTime, TimeUnit.SECONDS)) { + try { + if (cpc.call()) { + return name; + } + } finally { + // Explicitly give up reference to the previous root state + // otherwise they would block cleanup. See OAK-3347 + refreshHead(true); + commitSemaphore.release(); + } } log.warn("Failed to create checkpoint {} in {} seconds.", name, checkpointsLockWaitTime); } catch (InterruptedException e) { @@ -369,34 +378,6 @@ public class LockBasedScheduler implements Scheduler { return name; } - /** - * Execute the passed callable with trying to acquire this store's commit - * lock. - * - * @param timeout - * the maximum time to wait for the store's commit lock - * @param unit - * the time unit of the {@code timeout} argument - * @param c - * callable to execute - * @return {@code false} if the store's commit lock cannot be acquired, the - * result of {@code c.call()} otherwise. - * @throws Exception - */ - private boolean locked(Callable c, long timeout, TimeUnit unit) throws Exception { - if (commitSemaphore.tryAcquire(timeout, unit)) { - try { - return c.call(); - } finally { - // Explicitly give up reference to the previous root state - // otherwise they would block cleanup. See OAK-3347 - refreshHead(true); - commitSemaphore.release(); - } - } - return false; - } - @Override public boolean removeCheckpoint(String name) { checkNotNull(name); diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointTest.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointTest.java index 913a587aee..dd385885db 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointTest.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointTest.java @@ -18,17 +18,13 @@ */ package org.apache.jackrabbit.oak.segment; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.util.concurrent.Callable; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.segment.memory.MemoryStore; @@ -37,7 +33,6 @@ import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; import org.apache.jackrabbit.oak.spi.state.NodeStore; -import org.junit.Ignore; import org.junit.Test; public class CheckpointTest { @@ -95,106 +90,6 @@ public class CheckpointTest { store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); } - /** - * OAK-3587 test simulates a timeout while trying to create a checkpoint, - * then releases the lock and tries again - */ - @Test - @Ignore("OAK-4122") - public void testShortWait() throws Exception { - final SegmentNodeStore store = SegmentNodeStoreBuilders.builder(new MemoryStore()).build(); - - // FIXME OAK-4122 - // store.setCheckpointsLockWaitTime(1); - - final Semaphore semaphore = new Semaphore(0); - final AtomicBoolean blocking = new AtomicBoolean(true); - - final Callable block = new Callable() { - - @Override - public Boolean call() { - while (blocking.get()) { - if (semaphore.availablePermits() == 0) { - semaphore.release(); - } - } - return true; - } - }; - - Thread background = new Thread() { - @Override - public void run() { - try { - // FIXME OAK-4122 - // store.locked(block, 10, SECONDS); - } catch (Exception e) { - // - } - } - }; - - background.start(); - semaphore.acquire(); - - String cp0 = store.checkpoint(10); - assertNull(store.retrieve(cp0)); - - blocking.set(false); - String cp1 = store.checkpoint(10); - assertNotNull(store.retrieve(cp1)); - } - - /** - * OAK-3587 test simulates a wait less than configured - * {@code SegmentNodeStore#setCheckpointsLockWaitTime(int)} value so the - * checkpoint call must return a valid value - */ - @Test - @Ignore("OAK-4122") - public void testLongWait() throws Exception { - final int blockTime = 1; - final SegmentNodeStore store = SegmentNodeStoreBuilders.builder(new MemoryStore()).build(); - - // FIXME OAK-4122 - // store.setCheckpointsLockWaitTime(blockTime + 1); - - final Semaphore semaphore = new Semaphore(0); - - final Callable block = new Callable() { - - @Override - public Boolean call() { - try { - semaphore.release(); - SECONDS.sleep(blockTime); - } catch (InterruptedException e) { - // - } - return true; - } - }; - - Thread background = new Thread() { - @Override - public void run() { - try { - // FIXME OAK-4122 - // store.locked(block, 10, SECONDS); - } catch (Exception e) { - // - } - } - }; - - background.start(); - semaphore.acquire(); - - String cp0 = store.checkpoint(10); - assertNotNull(store.retrieve(cp0)); - } - @Test public void testCheckpointMax() throws CommitFailedException, IOException { SegmentNodeStore store = SegmentNodeStoreBuilders.builder( diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/MergeTest.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/MergeTest.java index 7d8eaea3c8..2dec3665d3 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/MergeTest.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/MergeTest.java @@ -18,9 +18,9 @@ */ package org.apache.jackrabbit.oak.segment; -import static junit.framework.Assert.assertFalse; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.fail; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.util.concurrent.Semaphore; @@ -62,7 +62,7 @@ public class MergeTest { assertTrue(store.getRoot().hasProperty("foo")); assertTrue(store.getRoot().hasProperty("bar")); } - + @Test public void testOptimisticMerge() throws CommitFailedException, IOException { NodeStore store = SegmentNodeStoreBuilders.builder(new MemoryStore()).build(); @@ -141,5 +141,4 @@ public class MergeTest { running.set(false); background.join(); } - } diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedSchedulerCheckpointTest.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedSchedulerCheckpointTest.java new file mode 100644 index 0000000000..a804fceec1 --- /dev/null +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedSchedulerCheckpointTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.segment.scheduler; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +import java.util.Collections; +import java.util.concurrent.Callable; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.annotation.Nonnull; + +import org.apache.jackrabbit.oak.segment.memory.MemoryStore; +import org.apache.jackrabbit.oak.spi.commit.CommitHook; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.junit.Test; + +public class LockBasedSchedulerCheckpointTest { + /** + * OAK-3587 test simulates a timeout while trying to create a checkpoint, + * then releases the lock and tries again + */ + @Test + public void testShortWait() throws Exception { + MemoryStore ms = new MemoryStore(); + System.setProperty("oak.checkpoints.lockWaitTime", "1"); + final LockBasedScheduler scheduler = LockBasedScheduler.builder(ms.getRevisions(), ms.getReader()).build(); + + final Semaphore semaphore = new Semaphore(0); + final AtomicBoolean blocking = new AtomicBoolean(true); + + final Callable block = new Callable() { + + @Override + public Boolean call() { + while (blocking.get()) { + if (semaphore.availablePermits() == 0) { + semaphore.release(); + } + } + return true; + } + }; + + Thread background = new Thread() { + @Override + public void run() { + try { + Commit commit = createBlockingCommit(scheduler, "foo", "bar", block); + scheduler.schedule(commit); + } catch (Exception e) { + // + } + } + }; + + background.start(); + semaphore.acquire(); + + String cp0 = scheduler.checkpoint(10, Collections. emptyMap()); + assertNull(retrieveCheckpoint(scheduler, cp0)); + + blocking.set(false); + String cp1 = scheduler.checkpoint(10, Collections. emptyMap()); + assertNotNull(retrieveCheckpoint(scheduler, cp1)); + } + + /** + * OAK-3587 test simulates a wait less than configured + * {@code SegmentNodeStore#setCheckpointsLockWaitTime(int)} value so the + * checkpoint call must return a valid value + */ + @Test + public void testLongWait() throws Exception { + final int blockTime = 1; + MemoryStore ms = new MemoryStore(); + System.setProperty("oak.checkpoints.lockWaitTime", "2"); + final LockBasedScheduler scheduler = LockBasedScheduler.builder(ms.getRevisions(), ms.getReader()).build(); + + final Semaphore semaphore = new Semaphore(0); + + final Callable block = new Callable() { + + @Override + public Boolean call() { + try { + semaphore.release(); + SECONDS.sleep(blockTime); + } catch (InterruptedException e) { + // + } + return true; + } + }; + + Thread background = new Thread() { + @Override + public void run() { + try { + Commit commit = createBlockingCommit(scheduler, "foo", "bar", block); + scheduler.schedule(commit); + } catch (Exception e) { + // + } + } + }; + + background.start(); + semaphore.acquire(); + + String cp0 = scheduler.checkpoint(10, Collections. emptyMap()); + assertNotNull(retrieveCheckpoint(scheduler, cp0)); + } + + private NodeState retrieveCheckpoint(final Scheduler scheduler, final String checkpoint) { + checkNotNull(checkpoint); + NodeState cp = scheduler.getHeadNodeState().getChildNode("checkpoints").getChildNode(checkpoint) + .getChildNode("root"); + if (cp.exists()) { + return cp; + } + return null; + } + + private NodeState getRoot(Scheduler scheduler) { + return scheduler.getHeadNodeState().getChildNode("root"); + } + + private Commit createBlockingCommit(final Scheduler scheduler, final String property, String value, + final Callable callable) { + NodeBuilder a = getRoot(scheduler).builder(); + a.setProperty(property, value); + Commit blockingCommit = new Commit(a, new CommitHook() { + @Override + @Nonnull + public NodeState processCommit(NodeState before, NodeState after, CommitInfo info) { + try { + callable.call(); + } catch (Exception e) { + fail(); + } + return after; + } + }, CommitInfo.EMPTY); + + return blockingCommit; + } +}