Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (revision 1808972)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (working copy)
@@ -56,11 +56,12 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import javax.annotation.CheckForNull;
@@ -158,10 +159,38 @@
private final AtomicBoolean sufficientMemory = new AtomicBoolean(true);
/**
- * Flag signalling shutdown of the file store
+ * Flag signalling shutdown of this instance. Operations responsive to
+ * shutdown can periodically monitor this flag and terminate cleanly and
+ * early if this flag is {@code true}.
*/
private volatile boolean shutdown;
+ /**
+ * Determines if this instance was successfully shut down. This variable is
+ * protected by the {@link #shutdownLock}.
+ */
+ private boolean shutdownComplete;
+
+ /**
+ * Protects the state shared between {@link #close()} and every other
+ * operation in this instance. Moreover, it allows to implement responsive,
+ * graceful shutdown of long-running operations.
+ *
+ * The shutdown protocol is as follows. When {@link #close()} is invoked
+ * {@link #shutdown} is immediately set to {@code true}. Then {@link
+ * #close()} tries to acquire this lock in exclusive mode to perform the
+ * shutdown of this instance. When the shutdown completes, {@link
+ * #shutdownComplete} is set to {@code true} and this lock is released.
+ *
+ * Every other publicly accessible method needs to acquire this lock in
+ * non-exclusive mode. Then the method can check {@link #shutdownComplete}
+ * to determine if this instance was already closed or it can continue with
+ * its execution. If the method is responsive to shutdown, it can check
+ * {@link #shutdown} and terminate early. When the method finished, the lock
+ * is released.
+ */
+ private final ReadWriteLock shutdownLock = new ReentrantReadWriteLock();
+
private final FileStoreStats stats;
@Nonnull
@@ -245,9 +274,17 @@
}
FileStore bind(TarRevisions revisions) throws IOException {
- this.revisions = revisions;
- this.revisions.bind(this, tracker, initialNode());
- return this;
+ shutdownLock.readLock().lock();
+ try {
+ if (shutdownComplete) {
+ throw new IllegalStateException("already closed");
+ }
+ this.revisions = revisions;
+ this.revisions.bind(this, tracker, initialNode());
+ return this;
+ } finally {
+ shutdownLock.readLock().unlock();
+ }
}
@Nonnull
@@ -281,10 +318,16 @@
*/
public Runnable getGCRunner() {
return new SafeRunnable(format("TarMK revision gc [%s]", directory), () -> {
+ shutdownLock.readLock().lock();
try {
+ if (shutdownComplete) {
+ throw new IllegalStateException("already closed");
+ }
garbageCollector.run();
} catch (IOException e) {
log.error("Error running revision garbage collection", e);
+ } finally {
+ shutdownLock.readLock().unlock();
}
});
}
@@ -300,44 +343,85 @@
* @return the size of this store.
*/
private long size() {
- return tarFiles.size();
+ shutdownLock.readLock().lock();
+ try {
+ if (shutdownComplete) {
+ throw new IllegalStateException("already closed");
+ }
+ return tarFiles.size();
+ } finally {
+ shutdownLock.readLock().unlock();
+ }
}
- public int readerCount(){
- return tarFiles.readerCount();
+ public int readerCount() {
+ shutdownLock.readLock().lock();
+ try {
+ if (shutdownComplete) {
+ throw new IllegalStateException("already closed");
+ }
+ return tarFiles.readerCount();
+ } finally {
+ shutdownLock.readLock().unlock();
+ }
}
public FileStoreStats getStats() {
return stats;
}
- public void flush() throws IOException {
+ private void doFlush() throws IOException {
if (revisions == null) {
return;
}
- revisions.flush(new Callable() {
- @Override
- public Void call() throws Exception {
- segmentWriter.flush();
- tarFiles.flush();
- stats.flushed();
- return null;
- }
+ revisions.flush(() -> {
+ segmentWriter.flush();
+ tarFiles.flush();
+ stats.flushed();
+ return null;
});
}
+ public void flush() throws IOException {
+ shutdownLock.readLock().lock();
+ try {
+ if (shutdownComplete) {
+ throw new IllegalStateException("already closed");
+ }
+ doFlush();
+ } finally {
+ shutdownLock.readLock().unlock();
+ }
+ }
+
/**
* Run full garbage collection: estimation, compaction, cleanup.
*/
public void fullGC() throws IOException {
- garbageCollector.runFull();
+ shutdownLock.readLock().lock();
+ try {
+ if (shutdownComplete) {
+ throw new IllegalStateException("already closed");
+ }
+ garbageCollector.runFull();
+ } finally {
+ shutdownLock.readLock().unlock();
+ }
}
/**
* Run tail garbage collection.
*/
public void tailGC() throws IOException {
- garbageCollector.runTail();
+ shutdownLock.readLock().lock();
+ try {
+ if (shutdownComplete) {
+ throw new IllegalStateException("already closed");
+ }
+ garbageCollector.runTail();
+ } finally {
+ shutdownLock.readLock().unlock();
+ }
}
/**
@@ -345,7 +429,15 @@
* @return
*/
public GCEstimation estimateCompactionGain() {
- return garbageCollector.estimateCompactionGain();
+ shutdownLock.readLock().lock();
+ try {
+ if (shutdownComplete) {
+ throw new IllegalStateException("already closed");
+ }
+ return garbageCollector.estimateCompactionGain();
+ } finally {
+ shutdownLock.readLock().unlock();
+ }
}
/**
@@ -355,11 +447,27 @@
* @return {@code true} on success, {@code false} otherwise.
*/
public boolean compactFull() {
- return garbageCollector.compactFull().isSuccess();
+ shutdownLock.readLock().lock();
+ try {
+ if (shutdownComplete) {
+ throw new IllegalStateException("already closed");
+ }
+ return garbageCollector.compactFull().isSuccess();
+ } finally {
+ shutdownLock.readLock().unlock();
+ }
}
public boolean compactTail() {
- return garbageCollector.compactTail().isSuccess();
+ shutdownLock.readLock().lock();
+ try {
+ if (shutdownComplete) {
+ throw new IllegalStateException("already closed");
+ }
+ return garbageCollector.compactTail().isSuccess();
+ } finally {
+ shutdownLock.readLock().unlock();
+ }
}
/**
@@ -370,11 +478,19 @@
* skipping the reclaimed segments.
*/
public void cleanup() throws IOException {
- CompactionResult compactionResult = CompactionResult.skipped(
+ shutdownLock.readLock().lock();
+ try {
+ if (shutdownComplete) {
+ throw new IllegalStateException("already closed");
+ }
+ CompactionResult compactionResult = CompactionResult.skipped(
getGcGeneration(),
garbageCollector.gcOptions,
revisions.getHead());
- fileReaper.add(garbageCollector.cleanup(compactionResult));
+ fileReaper.add(garbageCollector.cleanup(compactionResult));
+ } finally {
+ shutdownLock.readLock().unlock();
+ }
}
/**
@@ -390,7 +506,15 @@
* @param collector reference collector called back for each blob reference found
*/
public void collectBlobReferences(Consumer collector) throws IOException {
- garbageCollector.collectBlobReferences(collector);
+ shutdownLock.readLock().lock();
+ try {
+ if (shutdownComplete) {
+ throw new IllegalStateException("already closed");
+ }
+ garbageCollector.collectBlobReferences(collector);
+ } finally {
+ shutdownLock.readLock().unlock();
+ }
}
/**
@@ -404,13 +528,29 @@
@Override
@Nonnull
public SegmentWriter getWriter() {
- return segmentWriter;
+ shutdownLock.readLock().lock();
+ try {
+ if (shutdownComplete) {
+ throw new IllegalStateException("already closed");
+ }
+ return segmentWriter;
+ } finally {
+ shutdownLock.readLock().unlock();
+ }
}
@Override
@Nonnull
public TarRevisions getRevisions() {
- return revisions;
+ shutdownLock.readLock().lock();
+ try {
+ if (shutdownComplete) {
+ throw new IllegalStateException("already closed");
+ }
+ return revisions;
+ } finally {
+ shutdownLock.readLock().unlock();
+ }
}
@Override
@@ -418,28 +558,39 @@
// Flag the store as shutting / shut down
shutdown = true;
- // avoid deadlocks by closing (and joining) the background
- // thread before acquiring the synchronization lock
- fileStoreScheduler.close();
-
+ shutdownLock.writeLock().lock();
try {
- flush();
- } catch (IOException e) {
- log.warn("Unable to flush the store", e);
- }
+ if (shutdownComplete) {
+ throw new IllegalStateException("already closed");
+ }
+
+ // avoid deadlocks by closing (and joining) the background
+ // thread before acquiring the synchronization lock
+ fileStoreScheduler.close();
- Closer closer = Closer.create();
- closer.register(revisions);
- if (lock != null) {
try {
- lock.release();
+ doFlush();
} catch (IOException e) {
- log.warn("Unable to release the file lock", e);
+ log.warn("Unable to flush the store", e);
}
+
+ Closer closer = Closer.create();
+ closer.register(revisions);
+ if (lock != null) {
+ try {
+ lock.release();
+ } catch (IOException e) {
+ log.warn("Unable to release the file lock", e);
+ }
+ }
+ closer.register(lockFile);
+ closer.register(tarFiles);
+ closeAndLogOnFail(closer);
+
+ shutdownComplete = true;
+ } finally {
+ shutdownLock.writeLock().unlock();
}
- closer.register(lockFile);
- closer.register(tarFiles);
- closeAndLogOnFail(closer);
// Try removing pending files in case the scheduler didn't have a chance to run yet
System.gc(); // for any memory-mappings that are no longer used
@@ -450,56 +601,71 @@
@Override
public boolean containsSegment(SegmentId id) {
- return tarFiles.containsSegment(id.getMostSignificantBits(), id.getLeastSignificantBits());
+ shutdownLock.readLock().lock();
+ try {
+ if (shutdownComplete) {
+ throw new IllegalStateException("already closed");
+ }
+ return tarFiles.containsSegment(id.getMostSignificantBits(), id.getLeastSignificantBits());
+ } finally {
+ shutdownLock.readLock().unlock();
+ }
}
@Override
@Nonnull
public Segment readSegment(final SegmentId id) {
+ shutdownLock.readLock().lock();
try {
- return segmentCache.getSegment(id, new Callable() {
- @Override
- public Segment call() throws Exception {
- return readSegmentUncached(tarFiles, id);
- }
- });
+ if (shutdownComplete) {
+ throw new IllegalStateException("already closed");
+ }
+ return segmentCache.getSegment(id, () -> readSegmentUncached(tarFiles, id));
} catch (ExecutionException e) {
SegmentNotFoundException snfe = asSegmentNotFoundException(e, id);
snfeListener.notify(id, snfe);
throw snfe;
+ } finally {
+ shutdownLock.readLock().unlock();
}
}
@Override
public void writeSegment(SegmentId id, byte[] buffer, int offset, int length) throws IOException {
- Segment segment = null;
-
- // If the segment is a data segment, create a new instance of Segment to
- // access some internal information stored in the segment and to store
- // in an in-memory cache for later use.
-
- GCGeneration generation = GCGeneration.NULL;
- Set references = null;
- Set binaryReferences = null;
-
- if (id.isDataSegmentId()) {
- ByteBuffer data;
-
- if (offset > 4096) {
- data = ByteBuffer.allocate(length);
- data.put(buffer, offset, length);
- data.rewind();
- } else {
- data = ByteBuffer.wrap(buffer, offset, length);
+ shutdownLock.readLock().lock();
+ try {
+ if (shutdownComplete) {
+ throw new IllegalStateException("already closed");
}
- segment = new Segment(tracker, segmentReader, id, data);
- generation = segment.getGcGeneration();
- references = readReferences(segment);
- binaryReferences = readBinaryReferences(segment);
- }
+ Segment segment = null;
+
+ // If the segment is a data segment, create a new instance of Segment to
+ // access some internal information stored in the segment and to store
+ // in an in-memory cache for later use.
+
+ GCGeneration generation = GCGeneration.NULL;
+ Set references = null;
+ Set binaryReferences = null;
+
+ if (id.isDataSegmentId()) {
+ ByteBuffer data;
+
+ if (offset > 4096) {
+ data = ByteBuffer.allocate(length);
+ data.put(buffer, offset, length);
+ data.rewind();
+ } else {
+ data = ByteBuffer.wrap(buffer, offset, length);
+ }
- tarFiles.writeSegment(
+ segment = new Segment(tracker, segmentReader, id, data);
+ generation = segment.getGcGeneration();
+ references = readReferences(segment);
+ binaryReferences = readBinaryReferences(segment);
+ }
+
+ tarFiles.writeSegment(
id.asUUID(),
buffer,
offset,
@@ -507,11 +673,14 @@
generation,
references,
binaryReferences
- );
+ );
- // Keep this data segment in memory as it's likely to be accessed soon.
- if (segment != null) {
- segmentCache.putSegment(segment);
+ // Keep this data segment in memory as it's likely to be accessed soon.
+ if (segment != null) {
+ segmentCache.putSegment(segment);
+ }
+ } finally {
+ shutdownLock.readLock().unlock();
}
}