Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java (revision d5ea72a43e62dc539fa9ccf4785e71ab75723d7d) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java (revision d17b341033bd75698dfe13844407334b24b09fdc) @@ -34,6 +34,8 @@ import javax.annotation.Nonnull; import com.google.common.base.Supplier; +import com.google.common.util.concurrent.Monitor; +import com.google.common.util.concurrent.Monitor.Guard; /** * This {@link WriteOperationHandler} uses a pool of {@link SegmentBufferWriter}s, @@ -43,8 +45,26 @@ * {@link SegmentWriter}. */ public class SegmentBufferWriterPool implements WriteOperationHandler { + /** + * Monitor protecting the state of this pool. Neither of {@link #writers}, + * {@link #borrowed} and {@link #disposed} must be modified without owning + * this monitor. + */ + private final Monitor poolMonitor = new Monitor(true); + + /** + * Pool of current writers that are not in use + */ private final Map writers = newHashMap(); + + /** + * Writers that are currently in use + */ private final Set borrowed = newHashSet(); + + /** + * Retired writers that have not yet been flushed + */ private final Set disposed = newHashSet(); @Nonnull @@ -92,41 +112,127 @@ } } + /** Monitor preventing multiple concurrent calls to {@link #flush} */ + private final Monitor flushMonitor = new Monitor(true); + @Override public void flush() throws IOException { + // Ensure flush is not called concurrently + flushMonitor.enter(); + + try { - List toFlush = newArrayList(); + List toFlush = newArrayList(); - synchronized (this) { + List toReturn = newArrayList(); + poolMonitor.enter(); + try { + // Collect all writers that are not currently in use and clear + // the list so they won't get re-used anymore. - toFlush.addAll(writers.values()); + toFlush.addAll(writers.values()); - toFlush.addAll(disposed); - writers.clear(); + writers.clear(); - disposed.clear(); + + // Collect all borrowed writers, which we need to wait for. + // Clear the list so they will get disposed once returned. + toReturn.addAll(borrowed); - borrowed.clear(); + borrowed.clear(); + } finally { + poolMonitor.leave(); - } + } - // Call flush from outside a synchronized context to avoid + + // Wait for the return of the borrowed writers. This is the + // case once all of them appear in the disposed set. + if (safeEnterWhen(poolMonitor, allReturned(toReturn))) { + try { + // Collect all disposed writers and clear the list to mark them + // as flushed. + toFlush.addAll(disposed); + disposed.clear(); + } finally { + poolMonitor.leave(); + } + + // Call flush from outside the pool monitor to avoid potential - // deadlocks of that method calling SegmentStore.writeSegment - for (SegmentBufferWriter writer : toFlush) { - writer.flush(); - } - } + // deadlocks of that method calling SegmentStore.writeSegment + for (SegmentBufferWriter writer : toFlush) { + writer.flush(); + } + } + } finally { + flushMonitor.leave(); + } + } - private synchronized SegmentBufferWriter borrowWriter(Object key) { + /** + * Create a {@code Guard} that is satisfied if and only if + * {@link #disposed} contains all items in {@code toReturn} + */ + @Nonnull + private Guard allReturned(final List toReturn) { + poolMonitor.enter(); + try { + return new Guard(poolMonitor) { + @Override + public boolean isSatisfied() { + return disposed.containsAll(toReturn); + } + }; + } finally { + poolMonitor.leave(); + } + } + + /** + * Same as {@code monitor.enterWhen(guard)} but copes with that + * pesky {@code InterruptedException} by catching it and setting this + * thread's interrupted flag. + */ + private static boolean safeEnterWhen(Monitor monitor, Guard guard) { + try { + monitor.enterWhen(guard); + return true; + } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + return false; + } + } + + /** + * Return a writer from the pool by its {@code key}. This method may + * return a fresh writer at any time. Callers need to return a writer + * before borrowing it again. Failing to do so leads to undefined + * behaviour. + */ + private SegmentBufferWriter borrowWriter(Object key) { + poolMonitor.enter(); + try { - SegmentBufferWriter writer = writers.remove(key); - if (writer == null) { - writer = new SegmentBufferWriter(store, tracker, reader, version, getWriterId(wid), gcGeneration.get()); - } else if (writer.getGeneration() != gcGeneration.get()) { - disposed.add(writer); - writer = new SegmentBufferWriter(store, tracker, reader, version, getWriterId(wid), gcGeneration.get()); - } - borrowed.add(writer); - return writer; + SegmentBufferWriter writer = writers.remove(key); + if (writer == null) { + writer = new SegmentBufferWriter(store, tracker, reader, version, getWriterId(wid), gcGeneration.get()); + } else if (writer.getGeneration() != gcGeneration.get()) { + disposed.add(writer); + writer = new SegmentBufferWriter(store, tracker, reader, version, getWriterId(wid), gcGeneration.get()); + } + borrowed.add(writer); + return writer; + } finally { + poolMonitor.leave(); - } + } + } - private synchronized void returnWriter(Object key, SegmentBufferWriter writer) { + /** + * Return a writer to the pool using the {@code key} that was used + * to borrow it. + */ + private void returnWriter(Object key, SegmentBufferWriter writer) { + poolMonitor.enter(); + try { - if (borrowed.remove(writer)) { - checkState(writers.put(key, writer) == null); - } else { - // Defer flush this writer as it was borrowed while flush() was called. - disposed.add(writer); + if (borrowed.remove(writer)) { + checkState(writers.put(key, writer) == null); + } else { + // Defer flush this writer as it was borrowed while flush() was called. + disposed.add(writer); + } + } finally { + poolMonitor.leave(); } } Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java (revision d5ea72a43e62dc539fa9ccf4785e71ab75723d7d) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java (revision d17b341033bd75698dfe13844407334b24b09fdc) @@ -58,9 +58,6 @@ /** * Flush any pending changes on any {@link SegmentBufferWriter} managed by this instance. - * This method does not block to wait for concurrent write operations. However, if - * a write operation is currently in progress a call to this method ensures the respective - * changes are properly flushed at the end of that call. * @throws IOException */ void flush() throws IOException;