Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java (revision 1746535) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java (working copy) @@ -34,6 +34,8 @@ import javax.annotation.Nonnull; import com.google.common.base.Supplier; +import com.google.common.util.concurrent.Monitor; +import com.google.common.util.concurrent.Monitor.Guard; /** * This {@link WriteOperationHandler} uses a pool of {@link SegmentBufferWriter}s, @@ -43,8 +45,27 @@ * {@link SegmentWriter}. */ public class SegmentBufferWriterPool implements WriteOperationHandler { + + /** + * Monitor protecting the state of this pool. Neither of {@link #writers}, + * {@link #borrowed} and {@link #disposed} must be modified without owning + * this monitor. + */ + private final Monitor poolMonitor = new Monitor(true); + + /** + * Pool of current writers that are not in use + */ private final Map writers = newHashMap(); + + /** + * Writers that are currently in use + */ private final Set borrowed = newHashSet(); + + /** + * Retired writers that have not yet been flushed + */ private final Set disposed = newHashSet(); @Nonnull @@ -95,41 +116,114 @@ @Override public void flush() throws IOException { List toFlush = newArrayList(); - synchronized (this) { + List toReturn = newArrayList(); + + poolMonitor.enter(); + try { + // Collect all writers that are not currently in use and clear + // the list so they won't get re-used anymore. toFlush.addAll(writers.values()); - toFlush.addAll(disposed); writers.clear(); - disposed.clear(); + + // Collect all borrowed writers, which we need to wait for. + // Clear the list so they will get disposed once returned. + toReturn.addAll(borrowed); borrowed.clear(); + } finally { + poolMonitor.leave(); } - // Call flush from outside a synchronized context to avoid + + // Wait for the return of the borrowed writers. This is the + // case once all of them appear in the disposed set. + if (safeEnterWhen(poolMonitor, allReturned(toReturn))) { + try { + // Collect all disposed writers and clear the list to mark them + // as flushed. + toFlush.addAll(toReturn); + disposed.removeAll(toReturn); + } finally { + poolMonitor.leave(); + } + } + + // Call flush from outside the pool monitor to avoid potential // deadlocks of that method calling SegmentStore.writeSegment for (SegmentBufferWriter writer : toFlush) { writer.flush(); } } - private synchronized SegmentBufferWriter borrowWriter(Object key) { - SegmentBufferWriter writer = writers.remove(key); - if (writer == null) { - writer = new SegmentBufferWriter(store, tracker, reader, version, getWriterId(wid), gcGeneration.get()); - } else if (writer.getGeneration() != gcGeneration.get()) { - disposed.add(writer); - writer = new SegmentBufferWriter(store, tracker, reader, version, getWriterId(wid), gcGeneration.get()); + /** + * Create a {@code Guard} that is satisfied if and only if {@link #disposed} + * contains all items in {@code toReturn} + */ + @Nonnull + private Guard allReturned(final List toReturn) { + return new Guard(poolMonitor) { + + @Override + public boolean isSatisfied() { + return disposed.containsAll(toReturn); + } + + }; + } + + /** + * Same as {@code monitor.enterWhen(guard)} but copes with that pesky {@code + * InterruptedException} by catching it and setting this thread's + * interrupted flag. + */ + private static boolean safeEnterWhen(Monitor monitor, Guard guard) { + try { + monitor.enterWhen(guard); + return true; + } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + return false; } - borrowed.add(writer); - return writer; } - private synchronized void returnWriter(Object key, SegmentBufferWriter writer) { - if (borrowed.remove(writer)) { - checkState(writers.put(key, writer) == null); - } else { - // Defer flush this writer as it was borrowed while flush() was called. - disposed.add(writer); + /** + * Return a writer from the pool by its {@code key}. This method may return + * a fresh writer at any time. Callers need to return a writer before + * borrowing it again. Failing to do so leads to undefined behaviour. + */ + private SegmentBufferWriter borrowWriter(Object key) { + poolMonitor.enter(); + try { + SegmentBufferWriter writer = writers.remove(key); + if (writer == null) { + writer = new SegmentBufferWriter(store, tracker, reader, version, getWriterId(wid), gcGeneration.get()); + } else if (writer.getGeneration() != gcGeneration.get()) { + disposed.add(writer); + writer = new SegmentBufferWriter(store, tracker, reader, version, getWriterId(wid), gcGeneration.get()); + } + borrowed.add(writer); + return writer; + } finally { + poolMonitor.leave(); } } + /** + * Return a writer to the pool using the {@code key} that was used to borrow + * it. + */ + private void returnWriter(Object key, SegmentBufferWriter writer) { + poolMonitor.enter(); + try { + if (borrowed.remove(writer)) { + checkState(writers.put(key, writer) == null); + } else { + // Defer flush this writer as it was borrowed while flush() was called. + disposed.add(writer); + } + } finally { + poolMonitor.leave(); + } + } + private String getWriterId(String wid) { if (++writerId > 9999) { writerId = 0; Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java (revision 1746535) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java (working copy) @@ -58,9 +58,6 @@ /** * Flush any pending changes on any {@link SegmentBufferWriter} managed by this instance. - * This method does not block to wait for concurrent write operations. However, if - * a write operation is currently in progress a call to this method ensures the respective - * changes are properly flushed at the end of that call. * @throws IOException */ void flush() throws IOException; Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java (revision 0) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java (working copy) @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.segment; + +import static com.google.common.collect.Maps.newConcurrentMap; +import static com.google.common.collect.Sets.intersection; +import static com.google.common.collect.Sets.newHashSet; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; + +import javax.annotation.CheckForNull; +import javax.annotation.Nonnull; + +import com.google.common.base.Suppliers; +import org.apache.jackrabbit.oak.segment.WriteOperationHandler.WriteOperation; +import org.apache.jackrabbit.oak.segment.memory.MemoryStore; +import org.junit.After; +import org.junit.Test; +import sun.java2d.SurfaceDataProxy.CountdownTracker; + +public class SegmentBufferWriterPoolTest { + private final MemoryStore store = new MemoryStore(); + + private final RecordId rootId = store.getRevisions().getHead(); + + private final SegmentBufferWriterPool pool = new SegmentBufferWriterPool( + store, store.getTracker(), store.getReader(), LATEST_VERSION, "", Suppliers.ofInstance(0)); + + private final ExecutorService[] executors = new ExecutorService[] { + newSingleThreadExecutor(), newSingleThreadExecutor(), newSingleThreadExecutor()}; + + public SegmentBufferWriterPoolTest() throws IOException { } + + private Future execute(final WriteOperation op, int executor) { + return executors[executor].submit(new Callable() { + @Override + public RecordId call() throws Exception { + return pool.execute(op); + } + }); + } + + private WriteOperation createOp(final String key, final ConcurrentMap map) { + return new WriteOperation() { + @Nonnull @Override + public RecordId execute(@Nonnull SegmentBufferWriter writer) { + map.put(key, writer); + return rootId; + } + }; + } + + @After + public void tearDown() { + for (ExecutorService executor : executors) { + executor.shutdown(); + } + } + + @Test + public void testThreadAffinity() throws IOException, ExecutionException, InterruptedException { + ConcurrentMap map1 = newConcurrentMap(); + Future res1 = execute(createOp("a", map1), 0); + Future res2 = execute(createOp("b", map1), 1); + Future res3 = execute(createOp("c", map1), 2); + + // Give the tasks some time to complete + sleepUninterruptibly(10, MILLISECONDS); + + assertEquals(rootId, res1.get()); + assertEquals(rootId, res2.get()); + assertEquals(rootId, res3.get()); + assertEquals(3, map1.size()); + + ConcurrentMap map2 = newConcurrentMap(); + Future res4 = execute(createOp("a", map2), 0); + Future res5 = execute(createOp("b", map2), 1); + Future res6 = execute(createOp("c", map2), 2); + + // Give the tasks some time to complete + sleepUninterruptibly(10, MILLISECONDS); + + assertEquals(rootId, res4.get()); + assertEquals(rootId, res5.get()); + assertEquals(rootId, res6.get()); + assertEquals(3, map2.size()); + assertEquals(map1, map2); + } + + @Test + public void testFlush() throws ExecutionException, InterruptedException, IOException { + ConcurrentMap map1 = newConcurrentMap(); + Future res1 = execute(createOp("a", map1), 0); + Future res2 = execute(createOp("b", map1), 1); + Future res3 = execute(createOp("c", map1), 2); + + // Give the tasks some time to complete + sleepUninterruptibly(10, MILLISECONDS); + + assertEquals(rootId, res1.get()); + assertEquals(rootId, res2.get()); + assertEquals(rootId, res3.get()); + assertEquals(3, map1.size()); + + pool.flush(); + + ConcurrentMap map2 = newConcurrentMap(); + Future res4 = execute(createOp("a", map2), 0); + Future res5 = execute(createOp("b", map2), 1); + Future res6 = execute(createOp("c", map2), 2); + + // Give the tasks some time to complete + sleepUninterruptibly(10, MILLISECONDS); + + assertEquals(rootId, res4.get()); + assertEquals(rootId, res5.get()); + assertEquals(rootId, res6.get()); + assertEquals(3, map2.size()); + assertTrue(intersection(newHashSet(map1.values()), newHashSet(map2.values())).isEmpty()); + } + + @Test + public void testFlushBlocks() throws ExecutionException, InterruptedException { + Future res = execute(new WriteOperation() { + @CheckForNull @Override + public RecordId execute(@Nonnull SegmentBufferWriter writer) { + try { + // This should deadlock as flush waits for this write + // operation to finish, which in this case contains the + // call to flush itself. + executors[1].submit(new Callable() { + @Override + public Void call() throws Exception { + pool.flush(); + return null; + } + }).get(100, MILLISECONDS); + return null; // No deadlock -> null indicates test failure + } catch (InterruptedException | ExecutionException ignore) { + return null; // No deadlock -> null indicates test failure + } catch (TimeoutException e) { + return rootId; // Deadlock -> rootId indicates test pass + } + } + }, 0); + + // Give the tasks some time to complete + sleepUninterruptibly(10, MILLISECONDS); + assertEquals(rootId, res.get()); + } + +} Property changes on: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property