Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (revision 1808972) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (working copy) @@ -56,7 +56,6 @@ import java.util.Map.Entry; import java.util.Set; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -85,6 +84,7 @@ import org.apache.jackrabbit.oak.segment.WriterCacheManager; import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions; import org.apache.jackrabbit.oak.segment.file.GCJournal.GCJournalEntry; +import org.apache.jackrabbit.oak.segment.file.ShutDown.ShutDownCloser; import org.apache.jackrabbit.oak.segment.file.tar.CleanupContext; import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; import org.apache.jackrabbit.oak.segment.file.tar.TarFiles; @@ -157,13 +157,10 @@ */ private final AtomicBoolean sufficientMemory = new AtomicBoolean(true); - /** - * Flag signalling shutdown of the file store - */ - private volatile boolean shutdown; - private final FileStoreStats stats; + private final ShutDown shutDown = new ShutDown(); + @Nonnull private final SegmentNotFoundExceptionListener snfeListener; @@ -211,7 +208,7 @@ new Runnable() { @Override public void run() { - if (shutdown) { + if (shutDown.shutDownRequested()) { return; } try { @@ -245,9 +242,11 @@ } FileStore bind(TarRevisions revisions) throws IOException { - this.revisions = revisions; - this.revisions.bind(this, tracker, initialNode()); - return this; + try (ShutDownCloser ignored = shutDown.keepAlive()) { + this.revisions = revisions; + this.revisions.bind(this, tracker, initialNode()); + return this; + } } @Nonnull @@ -281,7 +280,7 @@ */ public Runnable getGCRunner() { return new SafeRunnable(format("TarMK revision gc [%s]", directory), () -> { - try { + try (ShutDownCloser ignored = shutDown.keepAlive()) { garbageCollector.run(); } catch (IOException e) { log.error("Error running revision garbage collection", e); @@ -300,44 +299,55 @@ * @return the size of this store. */ private long size() { - return tarFiles.size(); + try (ShutDownCloser ignored = shutDown.keepAlive()) { + return tarFiles.size(); + } } - public int readerCount(){ - return tarFiles.readerCount(); + public int readerCount() { + try (ShutDownCloser ignored = shutDown.keepAlive()) { + return tarFiles.readerCount(); + } } public FileStoreStats getStats() { return stats; } - public void flush() throws IOException { + private void doFlush() throws IOException { if (revisions == null) { return; } - revisions.flush(new Callable() { - @Override - public Void call() throws Exception { - segmentWriter.flush(); - tarFiles.flush(); - stats.flushed(); - return null; - } + revisions.flush(() -> { + segmentWriter.flush(); + tarFiles.flush(); + stats.flushed(); + return null; }); } + public void flush() throws IOException { + try (ShutDownCloser ignored = shutDown.keepAlive()) { + doFlush(); + } + } + /** * Run full garbage collection: estimation, compaction, cleanup. */ public void fullGC() throws IOException { - garbageCollector.runFull(); + try (ShutDownCloser ignored = shutDown.keepAlive()) { + garbageCollector.runFull(); + } } /** * Run tail garbage collection. */ public void tailGC() throws IOException { - garbageCollector.runTail(); + try (ShutDownCloser ignored = shutDown.keepAlive()) { + garbageCollector.runTail(); + } } /** @@ -345,7 +355,9 @@ * @return */ public GCEstimation estimateCompactionGain() { - return garbageCollector.estimateCompactionGain(); + try (ShutDownCloser ignored = shutDown.keepAlive()) { + return garbageCollector.estimateCompactionGain(); + } } /** @@ -355,11 +367,15 @@ * @return {@code true} on success, {@code false} otherwise. */ public boolean compactFull() { - return garbageCollector.compactFull().isSuccess(); + try (ShutDownCloser ignored = shutDown.keepAlive()) { + return garbageCollector.compactFull().isSuccess(); + } } public boolean compactTail() { - return garbageCollector.compactTail().isSuccess(); + try (ShutDownCloser ignored = shutDown.keepAlive()) { + return garbageCollector.compactTail().isSuccess(); + } } /** @@ -370,11 +386,13 @@ * skipping the reclaimed segments. */ public void cleanup() throws IOException { - CompactionResult compactionResult = CompactionResult.skipped( + try (ShutDownCloser ignored = shutDown.keepAlive()) { + fileReaper.add(garbageCollector.cleanup(CompactionResult.skipped( getGcGeneration(), garbageCollector.gcOptions, - revisions.getHead()); - fileReaper.add(garbageCollector.cleanup(compactionResult)); + revisions.getHead() + ))); + } } /** @@ -390,7 +408,9 @@ * @param collector reference collector called back for each blob reference found */ public void collectBlobReferences(Consumer collector) throws IOException { - garbageCollector.collectBlobReferences(collector); + try (ShutDownCloser ignored = shutDown.keepAlive()) { + garbageCollector.collectBlobReferences(collector); + } } /** @@ -404,42 +424,45 @@ @Override @Nonnull public SegmentWriter getWriter() { - return segmentWriter; + try (ShutDownCloser ignored = shutDown.keepAlive()) { + return segmentWriter; + } } @Override @Nonnull public TarRevisions getRevisions() { - return revisions; + try (ShutDownCloser ignored = shutDown.keepAlive()) { + return revisions; + } } @Override public void close() { - // Flag the store as shutting / shut down - shutdown = true; - - // avoid deadlocks by closing (and joining) the background - // thread before acquiring the synchronization lock - fileStoreScheduler.close(); - - try { - flush(); - } catch (IOException e) { - log.warn("Unable to flush the store", e); - } + try (ShutDownCloser ignored = shutDown.shutDown()) { + // avoid deadlocks by closing (and joining) the background + // thread before acquiring the synchronization lock + fileStoreScheduler.close(); - Closer closer = Closer.create(); - closer.register(revisions); - if (lock != null) { try { - lock.release(); + doFlush(); } catch (IOException e) { - log.warn("Unable to release the file lock", e); + log.warn("Unable to flush the store", e); + } + + Closer closer = Closer.create(); + closer.register(revisions); + if (lock != null) { + try { + lock.release(); + } catch (IOException e) { + log.warn("Unable to release the file lock", e); + } } + closer.register(lockFile); + closer.register(tarFiles); + closeAndLogOnFail(closer); } - closer.register(lockFile); - closer.register(tarFiles); - closeAndLogOnFail(closer); // Try removing pending files in case the scheduler didn't have a chance to run yet System.gc(); // for any memory-mappings that are no longer used @@ -450,19 +473,16 @@ @Override public boolean containsSegment(SegmentId id) { - return tarFiles.containsSegment(id.getMostSignificantBits(), id.getLeastSignificantBits()); + try (ShutDownCloser ignored = shutDown.keepAlive()) { + return tarFiles.containsSegment(id.getMostSignificantBits(), id.getLeastSignificantBits()); + } } @Override @Nonnull public Segment readSegment(final SegmentId id) { - try { - return segmentCache.getSegment(id, new Callable() { - @Override - public Segment call() throws Exception { - return readSegmentUncached(tarFiles, id); - } - }); + try (ShutDownCloser ignored = shutDown.keepAlive()) { + return segmentCache.getSegment(id, () -> readSegmentUncached(tarFiles, id)); } catch (ExecutionException e) { SegmentNotFoundException snfe = asSegmentNotFoundException(e, id); snfeListener.notify(id, snfe); @@ -472,34 +492,35 @@ @Override public void writeSegment(SegmentId id, byte[] buffer, int offset, int length) throws IOException { - Segment segment = null; + try (ShutDownCloser ignored = shutDown.keepAlive()) { + Segment segment = null; - // If the segment is a data segment, create a new instance of Segment to - // access some internal information stored in the segment and to store - // in an in-memory cache for later use. - - GCGeneration generation = GCGeneration.NULL; - Set references = null; - Set binaryReferences = null; - - if (id.isDataSegmentId()) { - ByteBuffer data; - - if (offset > 4096) { - data = ByteBuffer.allocate(length); - data.put(buffer, offset, length); - data.rewind(); - } else { - data = ByteBuffer.wrap(buffer, offset, length); - } + // If the segment is a data segment, create a new instance of Segment to + // access some internal information stored in the segment and to store + // in an in-memory cache for later use. + + GCGeneration generation = GCGeneration.NULL; + Set references = null; + Set binaryReferences = null; + + if (id.isDataSegmentId()) { + ByteBuffer data; + + if (offset > 4096) { + data = ByteBuffer.allocate(length); + data.put(buffer, offset, length); + data.rewind(); + } else { + data = ByteBuffer.wrap(buffer, offset, length); + } - segment = new Segment(tracker, segmentReader, id, data); - generation = segment.getGcGeneration(); - references = readReferences(segment); - binaryReferences = readBinaryReferences(segment); - } + segment = new Segment(tracker, segmentReader, id, data); + generation = segment.getGcGeneration(); + references = readReferences(segment); + binaryReferences = readBinaryReferences(segment); + } - tarFiles.writeSegment( + tarFiles.writeSegment( id.asUUID(), buffer, offset, @@ -507,11 +528,12 @@ generation, references, binaryReferences - ); + ); - // Keep this data segment in memory as it's likely to be accessed soon. - if (segment != null) { - segmentCache.putSegment(segment); + // Keep this data segment in memory as it's likely to be accessed soon. + if (segment != null) { + segmentCache.putSegment(segment); + } } } @@ -1116,7 +1138,7 @@ reason = "Not enough memory"; return true; } - if (store.shutdown) { + if (store.shutDown.shutDownRequested()) { reason = "The FileStore is shutting down"; return true; } Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ShutDown.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ShutDown.java (nonexistent) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ShutDown.java (working copy) @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.file; + +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +class ShutDown { + + interface ShutDownCloser extends AutoCloseable { + + void close(); + + } + + private volatile boolean shutDownRequested; + + private boolean shutDown; + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + ShutDownCloser keepAlive() { + lock.readLock().lock(); + + if (shutDown) { + lock.readLock().unlock(); + throw new IllegalStateException("already shut down"); + } + + return () -> lock.readLock().unlock(); + } + + ShutDownCloser shutDown() { + shutDownRequested = true; + lock.writeLock().lock(); + + if (shutDown) { + lock.writeLock().unlock(); + throw new IllegalStateException("already shut down"); + } + + return () -> { + shutDown = true; + lock.writeLock().unlock(); + }; + } + + boolean shutDownRequested() { + return shutDownRequested; + } + +} Property changes on: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ShutDown.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property