Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (revision 1808972) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (working copy) @@ -56,11 +56,12 @@ import java.util.Map.Entry; import java.util.Set; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import javax.annotation.CheckForNull; @@ -158,10 +159,38 @@ private final AtomicBoolean sufficientMemory = new AtomicBoolean(true); /** - * Flag signalling shutdown of the file store + * Flag signalling shutdown of this instance. Operations responsive to + * shutdown can periodically monitor this flag and terminate cleanly and + * early if this flag is {@code true}. */ private volatile boolean shutdown; + /** + * Determines if this instance was successfully shut down. This variable is + * protected by the {@link #shutdownLock}. + */ + private boolean shutdownComplete; + + /** + * Protects the state shared between {@link #close()} and every other + * operation in this instance. Moreover, it allows to implement responsive, + * graceful shutdown of long-running operations. + *

+ * The shutdown protocol is as follows. When {@link #close()} is invoked + * {@link #shutdown} is immediately set to {@code true}. Then {@link + * #close()} tries to acquire this lock in exclusive mode to perform the + * shutdown of this instance. When the shutdown completes, {@link + * #shutdownComplete} is set to {@code true} and this lock is released. + *

+ * Every other publicly accessible method needs to acquire this lock in + * non-exclusive mode. Then the method can check {@link #shutdownComplete} + * to determine if this instance was already closed or it can continue with + * its execution. If the method is responsive to shutdown, it can check + * {@link #shutdown} and terminate early. When the method finished, the lock + * is released. + */ + private final ReadWriteLock shutdownLock = new ReentrantReadWriteLock(); + private final FileStoreStats stats; @Nonnull @@ -245,9 +274,17 @@ } FileStore bind(TarRevisions revisions) throws IOException { - this.revisions = revisions; - this.revisions.bind(this, tracker, initialNode()); - return this; + shutdownLock.readLock().lock(); + try { + if (shutdownComplete) { + throw new IllegalStateException("already closed"); + } + this.revisions = revisions; + this.revisions.bind(this, tracker, initialNode()); + return this; + } finally { + shutdownLock.readLock().unlock(); + } } @Nonnull @@ -281,10 +318,16 @@ */ public Runnable getGCRunner() { return new SafeRunnable(format("TarMK revision gc [%s]", directory), () -> { + shutdownLock.readLock().lock(); try { + if (shutdownComplete) { + throw new IllegalStateException("already closed"); + } garbageCollector.run(); } catch (IOException e) { log.error("Error running revision garbage collection", e); + } finally { + shutdownLock.readLock().unlock(); } }); } @@ -300,44 +343,85 @@ * @return the size of this store. */ private long size() { - return tarFiles.size(); + shutdownLock.readLock().lock(); + try { + if (shutdownComplete) { + throw new IllegalStateException("already closed"); + } + return tarFiles.size(); + } finally { + shutdownLock.readLock().unlock(); + } } - public int readerCount(){ - return tarFiles.readerCount(); + public int readerCount() { + shutdownLock.readLock().lock(); + try { + if (shutdownComplete) { + throw new IllegalStateException("already closed"); + } + return tarFiles.readerCount(); + } finally { + shutdownLock.readLock().unlock(); + } } public FileStoreStats getStats() { return stats; } - public void flush() throws IOException { + private void doFlush() throws IOException { if (revisions == null) { return; } - revisions.flush(new Callable() { - @Override - public Void call() throws Exception { - segmentWriter.flush(); - tarFiles.flush(); - stats.flushed(); - return null; - } + revisions.flush(() -> { + segmentWriter.flush(); + tarFiles.flush(); + stats.flushed(); + return null; }); } + public void flush() throws IOException { + shutdownLock.readLock().lock(); + try { + if (shutdownComplete) { + throw new IllegalStateException("already closed"); + } + doFlush(); + } finally { + shutdownLock.readLock().unlock(); + } + } + /** * Run full garbage collection: estimation, compaction, cleanup. */ public void fullGC() throws IOException { - garbageCollector.runFull(); + shutdownLock.readLock().lock(); + try { + if (shutdownComplete) { + throw new IllegalStateException("already closed"); + } + garbageCollector.runFull(); + } finally { + shutdownLock.readLock().unlock(); + } } /** * Run tail garbage collection. */ public void tailGC() throws IOException { - garbageCollector.runTail(); + shutdownLock.readLock().lock(); + try { + if (shutdownComplete) { + throw new IllegalStateException("already closed"); + } + garbageCollector.runTail(); + } finally { + shutdownLock.readLock().unlock(); + } } /** @@ -345,7 +429,15 @@ * @return */ public GCEstimation estimateCompactionGain() { - return garbageCollector.estimateCompactionGain(); + shutdownLock.readLock().lock(); + try { + if (shutdownComplete) { + throw new IllegalStateException("already closed"); + } + return garbageCollector.estimateCompactionGain(); + } finally { + shutdownLock.readLock().unlock(); + } } /** @@ -355,11 +447,27 @@ * @return {@code true} on success, {@code false} otherwise. */ public boolean compactFull() { - return garbageCollector.compactFull().isSuccess(); + shutdownLock.readLock().lock(); + try { + if (shutdownComplete) { + throw new IllegalStateException("already closed"); + } + return garbageCollector.compactFull().isSuccess(); + } finally { + shutdownLock.readLock().unlock(); + } } public boolean compactTail() { - return garbageCollector.compactTail().isSuccess(); + shutdownLock.readLock().lock(); + try { + if (shutdownComplete) { + throw new IllegalStateException("already closed"); + } + return garbageCollector.compactTail().isSuccess(); + } finally { + shutdownLock.readLock().unlock(); + } } /** @@ -370,11 +478,19 @@ * skipping the reclaimed segments. */ public void cleanup() throws IOException { - CompactionResult compactionResult = CompactionResult.skipped( + shutdownLock.readLock().lock(); + try { + if (shutdownComplete) { + throw new IllegalStateException("already closed"); + } + CompactionResult compactionResult = CompactionResult.skipped( getGcGeneration(), garbageCollector.gcOptions, revisions.getHead()); - fileReaper.add(garbageCollector.cleanup(compactionResult)); + fileReaper.add(garbageCollector.cleanup(compactionResult)); + } finally { + shutdownLock.readLock().unlock(); + } } /** @@ -390,7 +506,15 @@ * @param collector reference collector called back for each blob reference found */ public void collectBlobReferences(Consumer collector) throws IOException { - garbageCollector.collectBlobReferences(collector); + shutdownLock.readLock().lock(); + try { + if (shutdownComplete) { + throw new IllegalStateException("already closed"); + } + garbageCollector.collectBlobReferences(collector); + } finally { + shutdownLock.readLock().unlock(); + } } /** @@ -404,13 +528,29 @@ @Override @Nonnull public SegmentWriter getWriter() { - return segmentWriter; + shutdownLock.readLock().lock(); + try { + if (shutdownComplete) { + throw new IllegalStateException("already closed"); + } + return segmentWriter; + } finally { + shutdownLock.readLock().unlock(); + } } @Override @Nonnull public TarRevisions getRevisions() { - return revisions; + shutdownLock.readLock().lock(); + try { + if (shutdownComplete) { + throw new IllegalStateException("already closed"); + } + return revisions; + } finally { + shutdownLock.readLock().unlock(); + } } @Override @@ -418,28 +558,39 @@ // Flag the store as shutting / shut down shutdown = true; - // avoid deadlocks by closing (and joining) the background - // thread before acquiring the synchronization lock - fileStoreScheduler.close(); - + shutdownLock.writeLock().lock(); try { - flush(); - } catch (IOException e) { - log.warn("Unable to flush the store", e); - } + if (shutdownComplete) { + throw new IllegalStateException("already closed"); + } + + // avoid deadlocks by closing (and joining) the background + // thread before acquiring the synchronization lock + fileStoreScheduler.close(); - Closer closer = Closer.create(); - closer.register(revisions); - if (lock != null) { try { - lock.release(); + doFlush(); } catch (IOException e) { - log.warn("Unable to release the file lock", e); + log.warn("Unable to flush the store", e); } + + Closer closer = Closer.create(); + closer.register(revisions); + if (lock != null) { + try { + lock.release(); + } catch (IOException e) { + log.warn("Unable to release the file lock", e); + } + } + closer.register(lockFile); + closer.register(tarFiles); + closeAndLogOnFail(closer); + + shutdownComplete = true; + } finally { + shutdownLock.writeLock().unlock(); } - closer.register(lockFile); - closer.register(tarFiles); - closeAndLogOnFail(closer); // Try removing pending files in case the scheduler didn't have a chance to run yet System.gc(); // for any memory-mappings that are no longer used @@ -450,56 +601,71 @@ @Override public boolean containsSegment(SegmentId id) { - return tarFiles.containsSegment(id.getMostSignificantBits(), id.getLeastSignificantBits()); + shutdownLock.readLock().lock(); + try { + if (shutdownComplete) { + throw new IllegalStateException("already closed"); + } + return tarFiles.containsSegment(id.getMostSignificantBits(), id.getLeastSignificantBits()); + } finally { + shutdownLock.readLock().unlock(); + } } @Override @Nonnull public Segment readSegment(final SegmentId id) { + shutdownLock.readLock().lock(); try { - return segmentCache.getSegment(id, new Callable() { - @Override - public Segment call() throws Exception { - return readSegmentUncached(tarFiles, id); - } - }); + if (shutdownComplete) { + throw new IllegalStateException("already closed"); + } + return segmentCache.getSegment(id, () -> readSegmentUncached(tarFiles, id)); } catch (ExecutionException e) { SegmentNotFoundException snfe = asSegmentNotFoundException(e, id); snfeListener.notify(id, snfe); throw snfe; + } finally { + shutdownLock.readLock().unlock(); } } @Override public void writeSegment(SegmentId id, byte[] buffer, int offset, int length) throws IOException { - Segment segment = null; - - // If the segment is a data segment, create a new instance of Segment to - // access some internal information stored in the segment and to store - // in an in-memory cache for later use. - - GCGeneration generation = GCGeneration.NULL; - Set references = null; - Set binaryReferences = null; - - if (id.isDataSegmentId()) { - ByteBuffer data; - - if (offset > 4096) { - data = ByteBuffer.allocate(length); - data.put(buffer, offset, length); - data.rewind(); - } else { - data = ByteBuffer.wrap(buffer, offset, length); + shutdownLock.readLock().lock(); + try { + if (shutdownComplete) { + throw new IllegalStateException("already closed"); } - segment = new Segment(tracker, segmentReader, id, data); - generation = segment.getGcGeneration(); - references = readReferences(segment); - binaryReferences = readBinaryReferences(segment); - } + Segment segment = null; + + // If the segment is a data segment, create a new instance of Segment to + // access some internal information stored in the segment and to store + // in an in-memory cache for later use. + + GCGeneration generation = GCGeneration.NULL; + Set references = null; + Set binaryReferences = null; + + if (id.isDataSegmentId()) { + ByteBuffer data; + + if (offset > 4096) { + data = ByteBuffer.allocate(length); + data.put(buffer, offset, length); + data.rewind(); + } else { + data = ByteBuffer.wrap(buffer, offset, length); + } - tarFiles.writeSegment( + segment = new Segment(tracker, segmentReader, id, data); + generation = segment.getGcGeneration(); + references = readReferences(segment); + binaryReferences = readBinaryReferences(segment); + } + + tarFiles.writeSegment( id.asUUID(), buffer, offset, @@ -507,11 +673,14 @@ generation, references, binaryReferences - ); + ); - // Keep this data segment in memory as it's likely to be accessed soon. - if (segment != null) { - segmentCache.putSegment(segment); + // Keep this data segment in memory as it's likely to be accessed soon. + if (segment != null) { + segmentCache.putSegment(segment); + } + } finally { + shutdownLock.readLock().unlock(); } }