diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java index 27d32d587f..79d7afc970 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java @@ -18,6 +18,7 @@ */ package org.apache.jackrabbit.oak.segment.file; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Maps.newHashMap; @@ -25,8 +26,12 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -34,6 +39,7 @@ import javax.annotation.CheckForNull; import javax.annotation.Nonnull; import com.google.common.base.Supplier; +import org.apache.commons.io.FileUtils; import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; import org.apache.jackrabbit.oak.segment.CachingSegmentReader; import org.apache.jackrabbit.oak.segment.RecordType; @@ -46,6 +52,7 @@ import org.apache.jackrabbit.oak.segment.SegmentId; import org.apache.jackrabbit.oak.segment.SegmentIdFactory; import org.apache.jackrabbit.oak.segment.SegmentIdProvider; import org.apache.jackrabbit.oak.segment.SegmentNodeState; +import org.apache.jackrabbit.oak.segment.SegmentNotFoundException; import org.apache.jackrabbit.oak.segment.SegmentReader; import org.apache.jackrabbit.oak.segment.SegmentStore; import org.apache.jackrabbit.oak.segment.SegmentTracker; @@ -76,11 +83,14 @@ public abstract class AbstractFileStore implements SegmentStore, Closeable { */ static final int CURRENT_STORE_VERSION = 1; - private static final Pattern FILE_NAME_PATTERN = - Pattern.compile("(data)((0|[1-9][0-9]*)[0-9]{4})([a-z])?.tar"); - static final String FILE_NAME_FORMAT = "data%05d%s.tar"; + protected static boolean notEmptyDirectory(File path) { + Collection entries = FileUtils.listFiles(path, new String[] {"tar"}, false); + checkArgument(entries != null, "{} is not a directory, or an I/O error occurred", path); + return entries.size() > 0; + } + @Nonnull final SegmentTracker tracker; @@ -127,7 +137,14 @@ public abstract class AbstractFileStore implements SegmentStore, Closeable { this.ioMonitor = builder.getIOMonitor(); } - File getManifestFile() { + static SegmentNotFoundException asSegmentNotFoundException(ExecutionException e, SegmentId id) { + if (e.getCause() instanceof SegmentNotFoundException) { + return (SegmentNotFoundException) e.getCause(); + } + return new SegmentNotFoundException(id, e); + } + + File getManifestFile() { return new File(directory, MANIFEST_FILE_NAME); } @@ -182,35 +199,6 @@ public abstract class AbstractFileStore implements SegmentStore, Closeable { return segmentReader.getTemplateCacheStats(); } - static Map> collectFiles(File directory) { - Map> dataFiles = newHashMap(); - - for (File file : listFiles(directory)) { - Matcher matcher = FILE_NAME_PATTERN.matcher(file.getName()); - if (matcher.matches()) { - Integer index = Integer.parseInt(matcher.group(2)); - Map files = dataFiles.get(index); - if (files == null) { - files = newHashMap(); - dataFiles.put(index, files); - } - Character generation = 'a'; - if (matcher.group(4) != null) { - generation = matcher.group(4).charAt(0); - } - checkState(files.put(generation, file) == null); - } - } - - return dataFiles; - } - - @Nonnull - private static File[] listFiles(File directory) { - File[] files = directory.listFiles(); - return files == null ? new File[] {} : files; - } - @Nonnull public abstract SegmentWriter getWriter(); @@ -285,6 +273,29 @@ public abstract class AbstractFileStore implements SegmentStore, Closeable { }); } + static Set readReferences(Segment segment) { + Set references = new HashSet<>(); + for (int i = 0; i < segment.getReferencedSegmentIdCount(); i++) { + references.add(segment.getReferencedSegmentId(i)); + } + return references; + } + + static Set readBinaryReferences(final Segment segment) { + final Set binaryReferences = new HashSet<>(); + segment.forEachRecord(new RecordConsumer() { + + @Override + public void consume(int number, RecordType type, int offset) { + if (type == RecordType.BLOB_ID) { + binaryReferences.add(SegmentBlob.readBlobId(segment, number)); + } + } + + }); + return binaryReferences; + } + static void closeAndLogOnFail(Closeable closeable) { if (closeable != null) { try { @@ -296,4 +307,12 @@ public abstract class AbstractFileStore implements SegmentStore, Closeable { } } + Segment readSegmentUncached(TarFiles tarFiles, SegmentId id) { + ByteBuffer buffer = tarFiles.readSegment(id.getMostSignificantBits(), id.getLeastSignificantBits()); + if (buffer == null) { + throw new SegmentNotFoundException(id); + } + return new Segment(tracker, segmentReader, id, buffer); + } + } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java index a9b0d4f444..5ff02a5c98 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java @@ -18,10 +18,6 @@ */ package org.apache.jackrabbit.oak.segment.file; -import static com.google.common.collect.Lists.newArrayList; -import static com.google.common.collect.Lists.newArrayListWithCapacity; -import static com.google.common.collect.Lists.newLinkedList; -import static com.google.common.collect.Maps.newLinkedHashMap; import static com.google.common.collect.Sets.newHashSet; import static java.lang.Integer.getInteger; import static java.lang.String.format; @@ -48,10 +44,7 @@ import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; -import java.util.Arrays; -import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; @@ -59,8 +52,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; @@ -72,7 +63,6 @@ import com.google.common.base.Predicate; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; -import com.google.common.collect.ImmutableList; import com.google.common.io.Closer; import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector; import org.apache.jackrabbit.oak.segment.Compactor; @@ -86,6 +76,7 @@ import org.apache.jackrabbit.oak.segment.SegmentWriter; import org.apache.jackrabbit.oak.segment.WriterCacheManager; import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions; import org.apache.jackrabbit.oak.segment.file.GCJournal.GCJournalEntry; +import org.apache.jackrabbit.oak.segment.file.TarFiles.CleanupResult; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; import org.slf4j.Logger; @@ -95,6 +86,7 @@ import org.slf4j.LoggerFactory; * The storage implementation for tar files. */ public class FileStore extends AbstractFileStore { + private static final Logger log = LoggerFactory.getLogger(FileStore.class); /** @@ -121,9 +113,7 @@ public class FileStore extends AbstractFileStore { @Nonnull private final GarbageCollector garbageCollector; - private volatile List readers; - - private volatile TarWriter tarWriter; + private final TarFiles tarFiles; private final RandomAccessFile lockFile; @@ -160,8 +150,6 @@ public class FileStore extends AbstractFileStore { */ private volatile boolean shutdown; - private final ReadWriteLock fileStoreLock = new ReentrantReadWriteLock(); - private final FileStoreStats stats; @Nonnull @@ -192,30 +180,25 @@ public class FileStore extends AbstractFileStore { this.garbageCollector = new GarbageCollector( builder.getGcOptions(), builder.getGcListener(), new GCJournal(directory), builder.getCacheManager()); - Map> map = collectFiles(directory); - Manifest manifest = Manifest.empty(); - if (!map.isEmpty()) { + if (notEmptyDirectory(directory)) { manifest = checkManifest(openManifest()); } saveManifest(manifest); - this.readers = newArrayListWithCapacity(map.size()); - Integer[] indices = map.keySet().toArray(new Integer[map.size()]); - Arrays.sort(indices); - for (int i = indices.length - 1; i >= 0; i--) { - readers.add(TarReader.open(map.get(indices[i]), memoryMapping, recovery, ioMonitor)); - } - this.stats = new FileStoreStats(builder.getStatsProvider(), this, size()); + this.stats = new FileStoreStats(builder.getStatsProvider(), this, 0); + this.tarFiles = TarFiles.builder() + .withDirectory(directory) + .withMemoryMapping(memoryMapping) + .withTarRecovery(recovery) + .withIOMonitor(ioMonitor) + .withFileStoreStats(stats) + .withMaxFileSize(maxFileSize) + .build(); + this.stats.init(this.tarFiles.size()); - int writeNumber = 0; - if (indices.length > 0) { - writeNumber = indices[indices.length - 1] + 1; - } - this.tarWriter = new TarWriter(directory, stats, writeNumber, ioMonitor); - this.snfeListener = builder.getSnfeListener(); fileStoreScheduler.scheduleAtFixedRate( @@ -253,7 +236,7 @@ public class FileStore extends AbstractFileStore { } }); log.info("TarMK opened: {} (mmap={})", directory, memoryMapping); - log.debug("TarMK readers {}", this.readers); + log.debug("TAR files: {}", tarFiles); } FileStore bind(TarRevisions revisions) throws IOException { @@ -309,36 +292,14 @@ public class FileStore extends AbstractFileStore { } /** - * @return the size of this store. This method shouldn't be called from - * a very tight loop as it contents with the {@link #fileStoreLock}. + * @return the size of this store. */ private long size() { - List readersSnapshot; - long writeFileSnapshotSize; - - fileStoreLock.readLock().lock(); - try { - readersSnapshot = ImmutableList.copyOf(readers); - writeFileSnapshotSize = tarWriter != null ? tarWriter.fileLength() : 0; - } finally { - fileStoreLock.readLock().unlock(); - } - - long size = writeFileSnapshotSize; - for (TarReader reader : readersSnapshot) { - size += reader.size(); - } - - return size; + return tarFiles.size(); } public int readerCount(){ - fileStoreLock.readLock().lock(); - try { - return readers.size(); - } finally { - fileStoreLock.readLock().unlock(); - } + return tarFiles.readerCount(); } public FileStoreStats getStats() { @@ -353,9 +314,8 @@ public class FileStore extends AbstractFileStore { @Override public Void call() throws Exception { segmentWriter.flush(); - tarWriter.flush(); + tarFiles.flush(); stats.flushed(); - return null; } }); @@ -453,27 +413,15 @@ public class FileStore extends AbstractFileStore { Closer closer = Closer.create(); closer.register(revisions); - fileStoreLock.writeLock().lock(); - try { - if (lock != null) { - try { - lock.release(); - } catch (IOException e) { - log.warn("Unable to release the file lock", e); - } - } - closer.register(lockFile); - - List list = readers; - readers = newArrayList(); - for (TarReader reader : list) { - closer.register(reader); + if (lock != null) { + try { + lock.release(); + } catch (IOException e) { + log.warn("Unable to release the file lock", e); } - - closer.register(tarWriter); - } finally { - fileStoreLock.writeLock().unlock(); } + closer.register(lockFile); + closer.register(tarFiles); closeAndLogOnFail(closer); // Try removing pending files in case the scheduler didn't have a chance to run yet @@ -485,24 +433,7 @@ public class FileStore extends AbstractFileStore { @Override public boolean containsSegment(SegmentId id) { - if (FileStoreUtil.containSegment(readers, id)) { - return true; - } - - if (tarWriter != null) { - fileStoreLock.readLock().lock(); - try { - if (tarWriter.containsEntry(id.getMostSignificantBits(), id.getLeastSignificantBits())) { - return true; - } - } finally { - fileStoreLock.readLock().unlock(); - } - } - - // the writer might have switched to a new file, - // so we need to re-check the readers - return FileStoreUtil.containSegment(readers, id); + return tarFiles.containsSegment(id.getMostSignificantBits(), id.getLeastSignificantBits()); } @Override @@ -512,38 +443,7 @@ public class FileStore extends AbstractFileStore { return segmentCache.getSegment(id, new Callable() { @Override public Segment call() throws Exception { - ByteBuffer buffer = FileStoreUtil.readEntry(readers, id); - if (buffer != null) { - return new Segment(tracker, segmentReader, id, buffer); - } - - if (tarWriter != null) { - fileStoreLock.readLock().lock(); - try { - try { - buffer = tarWriter.readEntry(id.getMostSignificantBits(), id.getLeastSignificantBits()); - if (buffer != null) { - return new Segment(tracker, segmentReader, id, buffer); - } - } catch (IOException e) { - log.warn("Failed to read from tar file {}", tarWriter, e); - } - } finally { - fileStoreLock.readLock().unlock(); - } - } - - // The TarWriter might have become a TarReader in the - // meantime. Moreover, the TarWriter that became a TarReader - // might have additional entries. Because of this, we need - // to check the list of TarReaders once more. - - buffer = FileStoreUtil.readEntry(readers, id); - if (buffer != null) { - return new Segment(tracker, segmentReader, id, buffer); - } - - throw new SegmentNotFoundException(id); + return readSegmentUncached(tarFiles, id); } }); } catch (ExecutionException e) { @@ -553,13 +453,6 @@ public class FileStore extends AbstractFileStore { } } - private static SegmentNotFoundException asSegmentNotFoundException(ExecutionException e, SegmentId id) { - if (e.getCause() instanceof SegmentNotFoundException) { - return (SegmentNotFoundException) e.getCause(); - } - return new SegmentNotFoundException(id, e); - } - @Override public void writeSegment(SegmentId id, byte[] buffer, int offset, int length) throws IOException { Segment segment = null; @@ -569,6 +462,9 @@ public class FileStore extends AbstractFileStore { // in an in-memory cache for later use. int generation = 0; + Set references = null; + Set binaryReferences = null; + if (id.isDataSegmentId()) { ByteBuffer data; @@ -582,63 +478,26 @@ public class FileStore extends AbstractFileStore { segment = new Segment(tracker, segmentReader, id, data); generation = segment.getGcGeneration(); + references = readReferences(segment); + binaryReferences = readBinaryReferences(segment); } - fileStoreLock.writeLock().lock(); - try { - // Flush the segment to disk - - long size = tarWriter.writeEntry( - id.getMostSignificantBits(), - id.getLeastSignificantBits(), - buffer, - offset, - length, - generation - ); - - // If the segment is a data segment, update the graph before - // (potentially) flushing the TAR file. - - if (segment != null) { - populateTarGraph(segment, tarWriter); - populateTarBinaryReferences(segment, tarWriter); - } - - // Close the TAR file if the size exceeds the maximum. - - if (size >= maxFileSize) { - newWriter(); - } - } finally { - fileStoreLock.writeLock().unlock(); - } + tarFiles.writeSegment( + id.asUUID(), + buffer, + offset, + length, + generation, + references, + binaryReferences + ); // Keep this data segment in memory as it's likely to be accessed soon. - if (segment != null) { segmentCache.putSegment(segment); } } - /** - * Switch to a new tar writer. - * This method may only be called when holding the write lock of {@link #fileStoreLock} - * @throws IOException - */ - private void newWriter() throws IOException { - TarWriter newWriter = tarWriter.createNextGeneration(); - if (newWriter != tarWriter) { - File writeFile = tarWriter.getFile(); - List list = - newArrayListWithCapacity(1 + readers.size()); - list.add(TarReader.open(writeFile, memoryMapping, ioMonitor)); - list.addAll(readers); - readers = list; - tarWriter = newWriter; - } - } - private void checkDiskSpace(SegmentGCOptions gcOptions) { long repositoryDiskSpace = size(); long availableDiskSpace = directory.getFreeSpace(); @@ -943,99 +802,28 @@ public class FileStore extends AbstractFileStore { throws IOException { Stopwatch watch = Stopwatch.createStarted(); Set bulkRefs = newHashSet(); - Map cleaned = newLinkedHashMap(); - - long initialSize = 0; - fileStoreLock.writeLock().lock(); - try { - gcListener.info("TarMK GC #{}: cleanup started.", GC_COUNT); - gcListener.updateStatus(CLEANUP.message()); - newWriter(); - segmentCache.clear(); + gcListener.info("TarMK GC #{}: cleanup started.", GC_COUNT); + gcListener.updateStatus(CLEANUP.message()); + segmentCache.clear(); - // Suggest to the JVM that now would be a good time - // to clear stale weak references in the SegmentTracker - System.gc(); + // Suggest to the JVM that now would be a good time + // to clear stale weak references in the SegmentTracker + System.gc(); - for (SegmentId id : tracker.getReferencedSegmentIds()) { - if (id.isBulkSegmentId()) { - bulkRefs.add(id.asUUID()); - } - } - - for (TarReader reader : readers) { - cleaned.put(reader, reader); - initialSize += reader.size(); - } - } finally { - fileStoreLock.writeLock().unlock(); + for (SegmentId id : tracker.getReferencedSegmentIds()) { + bulkRefs.add(id.asUUID()); } - - gcListener.info("TarMK GC #{}: current repository size is {} ({} bytes)", - GC_COUNT, humanReadableByteCount(initialSize), initialSize); - - Set reclaim = newHashSet(); - for (TarReader reader : cleaned.keySet()) { - reader.mark(bulkRefs, reclaim, compactionResult.reclaimer()); - log.info("{}: size of bulk references/reclaim set {}/{}", - reader, bulkRefs.size(), reclaim.size()); - if (shutdown) { - gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT); - break; - } + + CleanupResult cleanupResult = tarFiles.cleanup(bulkRefs, compactionResult.reclaimer()); + if (cleanupResult.isInterrupted()) { + gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT); } - Set reclaimed = newHashSet(); - for (TarReader reader : cleaned.keySet()) { - cleaned.put(reader, reader.sweep(reclaim, reclaimed)); - if (shutdown) { - gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT); - break; - } - } - - // it doesn't account for concurrent commits that might have happened - long afterCleanupSize = 0; - - List oldReaders = newArrayList(); - fileStoreLock.writeLock().lock(); - try { - // Replace current list of reader with the cleaned readers taking care not to lose - // any new reader that might have come in through concurrent calls to newWriter() - List sweptReaders = newArrayList(); - for (TarReader reader : readers) { - if (cleaned.containsKey(reader)) { - TarReader newReader = cleaned.get(reader); - if (newReader != null) { - sweptReaders.add(newReader); - afterCleanupSize += newReader.size(); - } - // if these two differ, the former represents the swept version of the latter - if (newReader != reader) { - oldReaders.add(reader); - } - } else { - sweptReaders.add(reader); - } - } - readers = sweptReaders; - } finally { - fileStoreLock.writeLock().unlock(); - } - tracker.clearSegmentIdTables(reclaimed, compactionResult.gcInfo()); - - // Close old readers *after* setting readers to the new readers to avoid accessing - // a closed reader from readSegment() - LinkedList toRemove = newLinkedList(); - for (TarReader oldReader : oldReaders) { - closeAndLogOnFail(oldReader); - File file = oldReader.getFile(); - toRemove.addLast(file); - } - gcListener.info("TarMK GC #{}: cleanup marking files for deletion: {}", GC_COUNT, toFileNames(toRemove)); + tracker.clearSegmentIdTables(cleanupResult.getReclaimedSegmentIds(), compactionResult.gcInfo()); + gcListener.info("TarMK GC #{}: cleanup marking files for deletion: {}", GC_COUNT, toFileNames(cleanupResult.getRemovableFiles())); long finalSize = size(); - long reclaimedSize = initialSize - afterCleanupSize; + long reclaimedSize = cleanupResult.getReclaimedSize(); stats.reclaimed(reclaimedSize); gcJournal.persist(reclaimedSize, finalSize, getGcGeneration(), compactionMonitor.getCompactedNodes(), @@ -1046,7 +834,7 @@ public class FileStore extends AbstractFileStore { GC_COUNT, watch, watch.elapsed(MILLISECONDS), humanReadableByteCount(finalSize), finalSize, humanReadableByteCount(reclaimedSize), reclaimedSize); - return toRemove; + return cleanupResult.getRemovableFiles(); } private String toFileNames(@Nonnull List files) { @@ -1056,7 +844,13 @@ public class FileStore extends AbstractFileStore { return Joiner.on(",").join(files); } } - + + private void collectBulkReferences(Set bulkRefs) { + for (SegmentId id : tracker.getReferencedSegmentIds()) { + bulkRefs.add(id.asUUID()); + } + } + /** * Finds all external blob references that are currently accessible * in this repository and adds them to the given collector. Useful @@ -1071,19 +865,8 @@ public class FileStore extends AbstractFileStore { */ synchronized void collectBlobReferences(ReferenceCollector collector) throws IOException { segmentWriter.flush(); - List tarReaders = newArrayList(); - fileStoreLock.writeLock().lock(); - try { - newWriter(); - tarReaders.addAll(FileStore.this.readers); - } finally { - fileStoreLock.writeLock().unlock(); - } - int minGeneration = getGcGeneration() - gcOptions.getRetainedGenerations() + 1; - for (TarReader tarReader : tarReaders) { - tarReader.collectBlobReferences(collector, minGeneration); - } + tarFiles.collectBlobReferences(collector, minGeneration); } void cancel() { diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreStats.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreStats.java index 0cc990cf96..02ce90f009 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreStats.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreStats.java @@ -51,6 +51,10 @@ public class FileStoreStats implements FileStoreStatsMBean, FileStoreMonitor { repoSize.inc(initialSize); } + public void init(long initialSize) { + repoSize.inc(initialSize); + } + //~-----------------------------< FileStoreMonitor > @Override diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreUtil.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreUtil.java index 74fb2ab81e..17e18f6c86 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreUtil.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreUtil.java @@ -67,40 +67,50 @@ class FileStoreUtil { return null; } + static boolean containSegment(List readers, SegmentId id) { + return containSegment(readers, id.getMostSignificantBits(), id.getLeastSignificantBits()); + } + /** * Check if a segment is contained in one of the provided TAR files. * * @param readers A list of {@link TarReader} instances. - * @param id An instance of {@link SegmentId}. + * @param msb Most significant bits of the segment ID. + * @param lsb Least significant bits of the segment ID. * @return {@code true} if the segment is contained in at least one of the * provided TAR files, {@code false} otherwise. */ - static boolean containSegment(List readers, SegmentId id) { + static boolean containSegment(List readers, long msb, long lsb) { for (TarReader reader : readers) { - if (reader.containsEntry(id.getMostSignificantBits(), id.getLeastSignificantBits())) { + if (reader.containsEntry(msb, lsb)) { return true; } } return false; } + static ByteBuffer readEntry(List readers, SegmentId id) { + return readEntry(readers, id.getMostSignificantBits(), id.getLeastSignificantBits()); + } + /** * Read the entry corresponding to a segment from one of the provided TAR * files. * * @param readers A list of {@link TarReader} instances. - * @param id An instance of {@link SegmentId}. + * @param msb Most significant bits of the segment ID. + * @param lsb Least significant bits of the segment ID. * @return An instance of {@link ByteBuffer} if the entry for the segment * could be found, {@code null} otherwise. */ - static ByteBuffer readEntry(List readers, SegmentId id) { + static ByteBuffer readEntry(List readers, long msb, long lsb) { for (TarReader reader : readers) { if (reader.isClosed()) { log.debug("Skipping closed tar file {}", reader); continue; } try { - ByteBuffer buffer = reader.readEntry(id.getMostSignificantBits(), id.getLeastSignificantBits()); + ByteBuffer buffer = reader.readEntry(msb, lsb); if (buffer != null) { return buffer; } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java index e3d7fdae56..81a65a469c 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java @@ -18,19 +18,10 @@ */ package org.apache.jackrabbit.oak.segment.file; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.collect.Lists.newArrayList; -import static com.google.common.collect.Lists.newArrayListWithCapacity; -import static com.google.common.collect.Maps.newHashMap; -import static com.google.common.collect.Sets.newHashSet; -import static java.util.Collections.emptyMap; import static org.apache.jackrabbit.oak.segment.SegmentWriterBuilder.segmentWriterBuilder; -import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.HashMap; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -45,7 +36,6 @@ import org.apache.jackrabbit.oak.segment.RecordId; import org.apache.jackrabbit.oak.segment.Segment; import org.apache.jackrabbit.oak.segment.SegmentGraph.SegmentGraphVisitor; import org.apache.jackrabbit.oak.segment.SegmentId; -import org.apache.jackrabbit.oak.segment.SegmentNotFoundException; import org.apache.jackrabbit.oak.segment.SegmentWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +51,7 @@ public class ReadOnlyFileStore extends AbstractFileStore { private static final Logger log = LoggerFactory .getLogger(ReadOnlyFileStore.class); - private final List readers; + private final TarFiles tarFiles; @Nonnull private final SegmentWriter writer; @@ -73,21 +63,17 @@ public class ReadOnlyFileStore extends AbstractFileStore { ReadOnlyFileStore(FileStoreBuilder builder) throws InvalidFileStoreVersionException, IOException { super(builder); - Map> map = collectFiles(directory); - - if (!map.isEmpty()) { + if (notEmptyDirectory(directory)) { checkManifest(openManifest()); } - this.readers = newArrayListWithCapacity(map.size()); - Integer[] indices = map.keySet().toArray(new Integer[map.size()]); - Arrays.sort(indices); - for (int i = indices.length - 1; i >= 0; i--) { - // only try to read-only recover the latest file as that might - // be the *only* one still being accessed by a writer - boolean recover = i == indices.length - 1; - readers.add(TarReader.openRO(map.get(indices[i]), memoryMapping, recover, recovery, ioMonitor)); - } + tarFiles = TarFiles.builder() + .withDirectory(directory) + .withTarRecovery(recovery) + .withIOMonitor(ioMonitor) + .withMemoryMapping(memoryMapping) + .withReadOnly() + .build(); writer = segmentWriterBuilder("read-only").withoutCache().build(this); log.info("TarMK ReadOnly opened: {} (mmap={})", directory, @@ -114,25 +100,6 @@ public class ReadOnlyFileStore extends AbstractFileStore { } /** - * Include the ids of all segments transitively reachable through forward - * references from {@code referencedIds}. See OAK-3864. - */ - private static void includeForwardReferences(Iterable readers, - Set referencedIds) throws IOException { - Set fRefs = newHashSet(referencedIds); - do { - // Add direct forward references - for (TarReader reader : readers) { - reader.calculateForwardReferences(fRefs); - if (fRefs.isEmpty()) { - break; // Optimisation: bail out if no references left - } - } - // ... as long as new forward references are found. - } while (referencedIds.addAll(fRefs)); - } - - /** * Build the graph of segments reachable from an initial set of segments * * @param roots @@ -141,15 +108,8 @@ public class ReadOnlyFileStore extends AbstractFileStore { * visitor receiving call back while following the segment graph * @throws IOException */ - public void traverseSegmentGraph(@Nonnull Set roots, - @Nonnull SegmentGraphVisitor visitor) throws IOException { - - List readers = this.readers; - includeForwardReferences(readers, roots); - for (TarReader reader : readers) { - reader.traverseSegmentGraph(checkNotNull(roots), - checkNotNull(visitor)); - } + public void traverseSegmentGraph(@Nonnull Set roots, @Nonnull SegmentGraphVisitor visitor) throws IOException { + tarFiles.traverseSegmentGraph(roots, visitor); } @Override @@ -159,7 +119,7 @@ public class ReadOnlyFileStore extends AbstractFileStore { @Override public boolean containsSegment(SegmentId id) { - return FileStoreUtil.containSegment(readers, id); + return tarFiles.containsSegment(id.getMostSignificantBits(), id.getLeastSignificantBits()); } @Override @@ -169,26 +129,18 @@ public class ReadOnlyFileStore extends AbstractFileStore { return segmentCache.getSegment(id, new Callable() { @Override public Segment call() throws Exception { - ByteBuffer buffer = FileStoreUtil.readEntry(readers, id); - if (buffer == null) { - throw new SegmentNotFoundException(id); - } - return new Segment(tracker, segmentReader, id, buffer); + return readSegmentUncached(tarFiles, id); } }); } catch (ExecutionException e) { - throw e.getCause() instanceof SegmentNotFoundException - ? (SegmentNotFoundException) e.getCause() - : new SegmentNotFoundException(id, e); + throw asSegmentNotFoundException(e, id); } } @Override public void close() { Closer closer = Closer.create(); - for (TarReader r : readers) { - closer.register(r); - } + closer.register(tarFiles); closer.register(revisions); closeAndLogOnFail(closer); System.gc(); // for any memory-mappings that are no longer used @@ -202,39 +154,19 @@ public class ReadOnlyFileStore extends AbstractFileStore { } public Map> getTarReaderIndex() { - Map> index = new HashMap>(); - for (TarReader reader : readers) { - index.put(reader.getFile().getAbsolutePath(), reader.getUUIDs()); - } - return index; + return tarFiles.getIndices(); } - public Map> getTarGraph(String fileName) - throws IOException { - for (TarReader reader : readers) { - if (fileName.equals(reader.getFile().getName())) { - Map> graph = newHashMap(); - for (UUID uuid : reader.getUUIDs()) { - graph.put(uuid, null); - } - Map> g = reader.getGraph(false); - if (g != null) { - graph.putAll(g); - } - return graph; - } - } - return emptyMap(); + public Map> getTarGraph(String fileName) throws IOException { + return tarFiles.getGraph(fileName); } public Iterable getSegmentIds() { - List ids = newArrayList(); - for (TarReader reader : readers) { - for (UUID uuid : reader.getUUIDs()) { - long msb = uuid.getMostSignificantBits(); - long lsb = uuid.getLeastSignificantBits(); - ids.add(tracker.newSegmentId(msb, lsb)); - } + List ids = new ArrayList<>(); + for (UUID id : tarFiles.getSegmentIds()) { + long msb = id.getMostSignificantBits(); + long lsb = id.getLeastSignificantBits(); + ids.add(tracker.newSegmentId(msb, lsb)); } return ids; } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java new file mode 100644 index 0000000000..298e0781c5 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.file; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.Lists.newArrayList; +import static com.google.common.collect.Lists.newArrayListWithCapacity; +import static com.google.common.collect.Maps.newHashMap; +import static com.google.common.collect.Maps.newLinkedHashMap; +import static com.google.common.collect.Sets.newHashSet; +import static org.apache.commons.io.FileUtils.listFiles; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.google.common.base.Predicate; +import com.google.common.io.Closer; +import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector; +import org.apache.jackrabbit.oak.segment.SegmentGraph.SegmentGraphVisitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class TarFiles implements Closeable { + + static class CleanupResult { + + private boolean interrupted; + + private long reclaimedSize; + + private List removableFiles; + + private Set reclaimedSegmentIds; + + private CleanupResult() { + // Prevent external instantiation. + } + + long getReclaimedSize() { + return reclaimedSize; + } + + List getRemovableFiles() { + return removableFiles; + } + + Set getReclaimedSegmentIds() { + return reclaimedSegmentIds; + } + + boolean isInterrupted() { + return interrupted; + } + + } + + static class Builder { + + private File directory; + + private boolean memoryMapping; + + private TarRecovery tarRecovery; + + private IOMonitor ioMonitor; + + private FileStoreStats fileStoreStats; + + private long maxFileSize; + + private boolean readOnly; + + Builder withDirectory(File directory) { + this.directory = checkNotNull(directory); + return this; + } + + Builder withMemoryMapping(boolean memoryMapping) { + this.memoryMapping = memoryMapping; + return this; + } + + Builder withTarRecovery(TarRecovery tarRecovery) { + this.tarRecovery = checkNotNull(tarRecovery); + return this; + } + + Builder withIOMonitor(IOMonitor ioMonitor) { + this.ioMonitor = checkNotNull(ioMonitor); + return this; + } + + Builder withFileStoreStats(FileStoreStats fileStoreStats) { + this.fileStoreStats = checkNotNull(fileStoreStats); + return this; + } + + Builder withMaxFileSize(long maxFileSize) { + checkArgument(maxFileSize > 0); + this.maxFileSize = maxFileSize; + return this; + } + + Builder withReadOnly() { + this.readOnly = true; + return this; + } + + public TarFiles build() throws IOException { + checkState(directory != null, "Directory not specified"); + checkState(tarRecovery != null, "TAR recovery strategy not specified"); + checkState(ioMonitor != null, "I/O monitor not specified"); + checkState(readOnly || fileStoreStats != null, "File store statistics not specified"); + checkState(readOnly || maxFileSize != 0, "Max file size not specified"); + return new TarFiles(this); + } + + } + + private static final Logger log = LoggerFactory.getLogger(TarFiles.class); + + private static final Pattern FILE_NAME_PATTERN = Pattern.compile("(data)((0|[1-9][0-9]*)[0-9]{4})([a-z])?.tar"); + + private static Map> collectFiles(File directory) { + Map> dataFiles = newHashMap(); + for (File file : listFiles(directory, null, false)) { + Matcher matcher = FILE_NAME_PATTERN.matcher(file.getName()); + if (matcher.matches()) { + Integer index = Integer.parseInt(matcher.group(2)); + Map files = dataFiles.get(index); + if (files == null) { + files = newHashMap(); + dataFiles.put(index, files); + } + Character generation = 'a'; + if (matcher.group(4) != null) { + generation = matcher.group(4).charAt(0); + } + checkState(files.put(generation, file) == null); + } + } + return dataFiles; + } + + /** + * Include the ids of all segments transitively reachable through forward + * references from {@code referencedIds}. See OAK-3864. + */ + private static void includeForwardReferences(Iterable readers, Set referencedIds) throws IOException { + Set fRefs = newHashSet(referencedIds); + do { + // Add direct forward references + for (TarReader reader : readers) { + reader.calculateForwardReferences(fRefs); + if (fRefs.isEmpty()) { + break; // Optimisation: bail out if no references left + } + } + // ... as long as new forward references are found. + } while (referencedIds.addAll(fRefs)); + } + + static Builder builder() { + return new Builder(); + } + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + private final long maxFileSize; + + private final boolean memoryMapping; + + private final IOMonitor ioMonitor; + + private final boolean readOnly; + + private List readers; + + private TarWriter writer; + + private volatile boolean shutdown; + + private boolean closed; + + private TarFiles(Builder builder) throws IOException { + maxFileSize = builder.maxFileSize; + memoryMapping = builder.memoryMapping; + ioMonitor = builder.ioMonitor; + readOnly = builder.readOnly; + Map> map = collectFiles(builder.directory); + readers = newArrayListWithCapacity(map.size()); + Integer[] indices = map.keySet().toArray(new Integer[map.size()]); + Arrays.sort(indices); + for (int i = indices.length - 1; i >= 0; i--) { + if (readOnly) { + readers.add(TarReader.openRO(map.get(indices[i]), memoryMapping, true, builder.tarRecovery, ioMonitor)); + } else { + readers.add(TarReader.open(map.get(indices[i]), memoryMapping, builder.tarRecovery, ioMonitor)); + } + } + if (!readOnly) { + int writeNumber = 0; + if (indices.length > 0) { + writeNumber = indices[indices.length - 1] + 1; + } + writer = new TarWriter(builder.directory, builder.fileStoreStats, writeNumber, builder.ioMonitor); + } + } + + private void checkOpen() { + checkState(!closed, "This instance has been closed"); + } + + private void checkReadWrite() { + checkState(!readOnly, "This instance is read-only"); + } + + @Override + public void close() throws IOException { + shutdown = true; + lock.writeLock().lock(); + try { + checkOpen(); + closed = true; + Closer closer = Closer.create(); + closer.register(writer); + writer = null; + for (TarReader reader : readers) { + closer.register(reader); + } + readers = null; + closer.close(); + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public String toString() { + lock.readLock().lock(); + try { + return "TarFiles{readers=" + readers + ", writer=" + writer + "}"; + } finally { + lock.readLock().unlock(); + } + } + + long size() { + lock.readLock().lock(); + try { + checkOpen(); + long size = 0; + if (!readOnly) { + size = writer.fileLength(); + } + for (TarReader reader : readers) { + size += reader.size(); + } + return size; + } finally { + lock.readLock().unlock(); + } + } + + int readerCount() { + lock.readLock().lock(); + try { + checkOpen(); + return readers.size(); + } finally { + lock.readLock().unlock(); + } + } + + void flush() throws IOException { + checkReadWrite(); + lock.readLock().lock(); + try { + checkOpen(); + writer.flush(); + } finally { + lock.readLock().unlock(); + } + } + + boolean containsSegment(long msb, long lsb) { + lock.readLock().lock(); + try { + checkOpen(); + if (!readOnly) { + if (writer.containsEntry(msb, lsb)) { + return true; + } + } + for (TarReader reader : readers) { + if (reader.containsEntry(msb, lsb)) { + return true; + } + } + return false; + } finally { + lock.readLock().unlock(); + } + } + + ByteBuffer readSegment(long msb, long lsb) { + lock.readLock().lock(); + try { + checkOpen(); + try { + if (!readOnly) { + ByteBuffer buffer = writer.readEntry(msb, lsb); + if (buffer != null) { + return buffer; + } + } + for (TarReader reader : readers) { + ByteBuffer buffer = reader.readEntry(msb, lsb); + if (buffer != null) { + return buffer; + } + } + } catch (IOException e) { + log.warn("Unable to read from TAR file {}", writer, e); + } + return null; + } finally { + lock.readLock().unlock(); + } + } + + void writeSegment(UUID id, byte[] buffer, int offset, int length, int generation, Set references, Set binaryReferences) throws IOException { + checkReadWrite(); + lock.writeLock().lock(); + try { + checkOpen(); + long size = writer.writeEntry( + id.getMostSignificantBits(), + id.getLeastSignificantBits(), + buffer, + offset, + length, + generation + ); + if (references != null) { + for (UUID reference : references) { + writer.addGraphEdge(id, reference); + } + } + if (binaryReferences != null) { + for (String reference : binaryReferences) { + writer.addBinaryReference(generation, id, reference); + } + } + if (size >= maxFileSize) { + newWriter(); + } + } finally { + lock.writeLock().unlock(); + } + } + + private void newWriter() throws IOException { + TarWriter newWriter = writer.createNextGeneration(); + if (newWriter == writer) { + return; + } + File writeFile = writer.getFile(); + List list = newArrayListWithCapacity(1 + readers.size()); + list.add(TarReader.open(writeFile, memoryMapping, ioMonitor)); + list.addAll(readers); + readers = list; + writer = newWriter; + } + + CleanupResult cleanup(Set references, Predicate reclaimGeneration) throws IOException { + checkReadWrite(); + + CleanupResult result = new CleanupResult(); + result.removableFiles = new ArrayList<>(); + result.reclaimedSegmentIds = new HashSet<>(); + + Map cleaned = newLinkedHashMap(); + + lock.writeLock().lock(); + lock.readLock().lock(); + try { + try { + checkOpen(); + newWriter(); + } finally { + lock.writeLock().unlock(); + } + + // At this point the write lock is downgraded to a read lock for + // better concurrency. It is always necessary to access TarReader + // and TarWriter instances while holding a lock (either in read or + // write mode) to prevent a concurrent #close(). In this case, we + // don't need an exclusive access to the TarReader instances. + + // TODO now that the two protected sections have been merged thanks + // to lock downgrading, check if the following code can be + // simplified. + + for (TarReader reader : readers) { + cleaned.put(reader, reader); + result.reclaimedSize += reader.size(); + } + Set reclaim = newHashSet(); + for (TarReader reader : cleaned.keySet()) { + if (shutdown) { + result.interrupted = true; + return result; + } + reader.mark(references, reclaim, reclaimGeneration); + log.info("{}: size of bulk references/reclaim set {}/{}", reader, references.size(), reclaim.size()); + } + for (TarReader reader : cleaned.keySet()) { + if (shutdown) { + result.interrupted = true; + return result; + } + cleaned.put(reader, reader.sweep(reclaim, result.reclaimedSegmentIds)); + } + } finally { + lock.readLock().unlock(); + } + + List oldReaders = newArrayList(); + lock.writeLock().lock(); + try { + // Replace current list of reader with the cleaned readers taking care not to lose + // any new reader that might have come in through concurrent calls to newWriter() + checkOpen(); + List sweptReaders = newArrayList(); + for (TarReader reader : readers) { + if (cleaned.containsKey(reader)) { + TarReader newReader = cleaned.get(reader); + if (newReader != null) { + sweptReaders.add(newReader); + result.reclaimedSize -= newReader.size(); + } + // if these two differ, the former represents the swept version of the latter + if (newReader != reader) { + oldReaders.add(reader); + } + } else { + sweptReaders.add(reader); + } + } + readers = sweptReaders; + } finally { + lock.writeLock().unlock(); + } + + for (TarReader oldReader : oldReaders) { + try { + oldReader.close(); + } catch (IOException e) { + log.error("Unable to close swept TAR reader", e); + } + result.removableFiles.add(oldReader.getFile()); + } + + return result; + } + + void collectBlobReferences(ReferenceCollector collector, int minGeneration) throws IOException { + lock.writeLock().lock(); + lock.readLock().lock(); + try { + try { + checkOpen(); + if (!readOnly) { + newWriter(); + } + } finally { + lock.writeLock().unlock(); + } + + // At this point the write lock is downgraded to a read lock for + // better concurrency. It is always necessary to access TarReader + // and TarWriter instances while holding a lock (either in read or + // write mode) to prevent a concurrent #close(). In this case, we + // don't need an exclusive access to the TarReader instances. + + for (TarReader reader : readers) { + reader.collectBlobReferences(collector, minGeneration); + } + } finally { + lock.readLock().unlock(); + } + } + + Iterable getSegmentIds() { + lock.readLock().lock(); + try { + checkOpen(); + List ids = new ArrayList<>(); + for (TarReader reader : readers) { + ids.addAll(reader.getUUIDs()); + } + return ids; + } finally { + lock.readLock().unlock(); + } + } + + Map> getGraph(String fileName) throws IOException { + Set index = null; + Map> graph = null; + + lock.readLock().lock(); + try { + checkOpen(); + for (TarReader reader : readers) { + if (fileName.equals(reader.getFile().getName())) { + index = reader.getUUIDs(); + graph = reader.getGraph(false); + break; + } + } + } finally { + lock.readLock().unlock(); + } + + Map> result = new HashMap<>(); + if (index != null) { + for (UUID uuid : index) { + result.put(uuid, null); + } + } + if (graph != null) { + result.putAll(graph); + } + return result; + } + + Map> getIndices() { + lock.readLock().lock(); + try { + checkOpen(); + Map> index = new HashMap<>(); + for (TarReader reader : readers) { + index.put(reader.getFile().getAbsolutePath(), reader.getUUIDs()); + } + return index; + } finally { + lock.readLock().unlock(); + } + } + + void traverseSegmentGraph(Set roots, SegmentGraphVisitor visitor) throws IOException { + lock.readLock().lock(); + try { + checkOpen(); + includeForwardReferences(readers, roots); + for (TarReader reader : readers) { + reader.traverseSegmentGraph(roots, visitor); + } + } finally { + lock.readLock().unlock(); + } + } + +}