Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (date 1442916492000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (date 1442319867000) @@ -23,6 +23,7 @@ import static com.google.common.collect.Lists.newArrayListWithCapacity; import static com.google.common.collect.Lists.newLinkedList; import static com.google.common.collect.Maps.newHashMap; +import static com.google.common.collect.Maps.newLinkedHashMap; import static com.google.common.collect.Sets.newHashSet; import static java.lang.String.format; import static java.util.Collections.emptyMap; @@ -55,8 +56,6 @@ import javax.annotation.Nonnull; import com.google.common.base.Stopwatch; -import com.google.common.collect.Maps; - import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob; import org.apache.jackrabbit.oak.plugins.segment.CompactionMap; @@ -161,7 +160,7 @@ * not be removed immediately, because they first need to be closed, and the * JVM needs to release the memory mapped file references. */ - private final LinkedList toBeRemoved = newLinkedList(); + private final List pendingRemove = newLinkedList(); /** * Version of the segment storage format. @@ -653,6 +652,7 @@ RecordId before = persistedHead.get(); RecordId after = head.get(); boolean cleanup = cleanupNeeded.getAndSet(false); + if (cleanup || !after.equals(before)) { // needs to happen outside the synchronization block below to // avoid a deadlock with another thread flushing the writer @@ -677,24 +677,23 @@ // otherwise they would block cleanup. See OAK-3347 before = null; after = null; - cleanup(); + pendingRemove.addAll(cleanup()); } } - synchronized (this) { + - // remove all obsolete tar generations + // remove all obsolete tar generations - Iterator iterator = toBeRemoved.iterator(); + Iterator iterator = pendingRemove.iterator(); - while (iterator.hasNext()) { - File file = iterator.next(); - log.debug("TarMK GC: Attempting to remove old file {}", - file); - if (!file.exists() || file.delete()) { - log.debug("TarMK GC: Removed old file {}", file); - iterator.remove(); - } - } - } - } + while (iterator.hasNext()) { + File file = iterator.next(); + log.debug("TarMK GC: Attempting to remove old file {}", + file); + if (!file.exists() || file.delete()) { + log.debug("TarMK GC: Removed old file {}", file); + iterator.remove(); + } + } + } + } - } /** * Runs garbage collection on the segment level, which could write new @@ -704,11 +703,11 @@ * A new generation of a tar file is created (and segments are only * discarded) if doing so releases more than 25% of the space in a tar file. */ - public void cleanup() throws IOException { + public List cleanup() throws IOException { Stopwatch watch = Stopwatch.createStarted(); long initialSize = size(); - CompactionMap cm = tracker.getCompactionMap(); - Set cleanedIds = newHashSet(); + Set referencedIds = newHashSet(); + Map cleaned = newLinkedHashMap(); synchronized (this) { gcMonitor.info("TarMK revision cleanup started. Current repository size {}", @@ -721,33 +720,57 @@ // to clear stale weak references in the SegmentTracker System.gc(); - Set ids = newHashSet(); for (SegmentId id : tracker.getReferencedSegmentIds()) { - ids.add(new UUID( + referencedIds.add(new UUID( id.getMostSignificantBits(), id.getLeastSignificantBits())); } - writer.collectReferences(ids); + writer.collectReferences(referencedIds); + for (TarReader reader : readers) { + cleaned.put(reader, null); + } + } - List list = newArrayListWithCapacity(readers.size()); + // Do actual cleanup outside of the lock to prevent blocking + // concurrent writers for a long time + CompactionMap cm = tracker.getCompactionMap(); + LinkedList toRemove = newLinkedList(); + Set cleanedIds = newHashSet(); + for (TarReader reader : cleaned.keySet()) { + TarReader newReader = reader.cleanup(referencedIds, cm, cleanedIds); + cleaned.put(reader, newReader); + } + + List oldReaders = newArrayList(); + synchronized (this) { + // Replace current list of reader with the cleaned readers taking care not to lose + // any new reader that might have come in through concurrent calls to newWriter() + List newReaders = newArrayList(); for (TarReader reader : readers) { - TarReader cleaned = reader.cleanup(ids, cm, cleanedIds); - if (cleaned == reader) { - list.add(reader); + if (cleaned.containsKey(reader)) { + TarReader newReader = cleaned.get(reader); + if (newReader != null) { + newReaders.add(newReader); + } + if (newReader != reader) { + oldReaders.add(reader); + } } else { - if (cleaned != null) { - list.add(cleaned); + newReaders.add(reader); - } + } - closeAndLogOnFail(reader); - File file = reader.getFile(); - gcMonitor.info("TarMK revision cleanup reclaiming {}", file.getName()); - toBeRemoved.addLast(file); - } + } + readers = newReaders; - } + } - readers = list; + + // Close old readers *after* setting readers to the new readers to avoid accessing + // a closed reader from readSegment() + for (TarReader oldReader : oldReaders) { + closeAndLogOnFail(oldReader); + File file = oldReader.getFile(); + gcMonitor.info("TarMK revision cleanup reclaiming {}", file.getName()); + toRemove.addLast(file); } - // Do this outside sync to avoid deadlock with SegmentId.getSegment(). See OAK-3179 cm.remove(cleanedIds); long finalSize = size(); gcMonitor.cleaned(initialSize - finalSize, finalSize); @@ -757,6 +780,7 @@ humanReadableByteCount(initialSize - finalSize), humanReadableByteCount(sum(cm.getEstimatedWeights())), cm.getDepth()); + return toRemove; } /** @@ -1067,7 +1091,7 @@ public Map> getTarGraph(String fileName) throws IOException { for (TarReader reader : readers) { if (fileName.equals(reader.getFile().getName())) { - Map> graph = Maps.newHashMap(); + Map> graph = newHashMap(); for (UUID uuid : reader.getUUIDs()) { graph.put(uuid, null); } @@ -1132,7 +1156,7 @@ public void flush() { /* nop */ } @Override - public synchronized void cleanup() { + public synchronized LinkedList cleanup() { throw new UnsupportedOperationException("Read Only Store"); } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/package-info.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/package-info.java (date 1442916492000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/package-info.java (date 1442319867000) @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -@Version("2.1.0") +@Version("2.2.0") @Export(optional = "provide:=true") package org.apache.jackrabbit.oak.plugins.segment.file;