diff --git a/oak-run/src/main/java/org/apache/jackrabbit/oak/explorer/SegmentTarExplorerBackend.java b/oak-run/src/main/java/org/apache/jackrabbit/oak/explorer/SegmentTarExplorerBackend.java index 239ef140cd..30c081271b 100644 --- a/oak-run/src/main/java/org/apache/jackrabbit/oak/explorer/SegmentTarExplorerBackend.java +++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/explorer/SegmentTarExplorerBackend.java @@ -47,11 +47,13 @@ import org.apache.jackrabbit.oak.segment.SegmentBlob; import org.apache.jackrabbit.oak.segment.SegmentId; import org.apache.jackrabbit.oak.segment.SegmentNodeState; import org.apache.jackrabbit.oak.segment.SegmentNodeStateHelper; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence.JournalFile; import org.apache.jackrabbit.oak.segment.SegmentPropertyState; import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; import org.apache.jackrabbit.oak.segment.file.JournalEntry; import org.apache.jackrabbit.oak.segment.file.JournalReader; import org.apache.jackrabbit.oak.segment.file.ReadOnlyFileStore; +import org.apache.jackrabbit.oak.segment.file.tar.LocalJournalFile; import org.apache.jackrabbit.oak.spi.state.NodeState; class SegmentTarExplorerBackend implements ExplorerBackend { @@ -85,7 +87,7 @@ class SegmentTarExplorerBackend implements ExplorerBackend { @Override public List readRevisions() { - File journal = new File(path, "journal.log"); + JournalFile journal = new LocalJournalFile(path, "journal.log"); if (!journal.exists()) { return newArrayList(); diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentArchiveManager.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentArchiveManager.java new file mode 100644 index 0000000000..6b5443ab65 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentArchiveManager.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment; + +import org.apache.jackrabbit.oak.segment.file.tar.FileStoreMonitor; +import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; +import org.apache.jackrabbit.oak.segment.file.tar.IOMonitor; +import org.apache.jackrabbit.oak.segment.file.tar.TarEntry; +import org.apache.jackrabbit.oak.segment.file.tar.TarFiles; +import org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndex; +import org.apache.jackrabbit.oak.segment.file.tar.binaries.InvalidBinaryReferencesIndexException; +import org.apache.jackrabbit.oak.segment.file.tar.index.Index; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * SegmentArchiveManager provides a low-level access to the segment files (eg. + * stored in the .tar). It allows to perform a few FS-like operations (delete, + * rename, copy, etc.) and also opens the segment archives either for reading + * or reading and writing. + */ +public interface SegmentArchiveManager { + + /** + * List names of the available archives. + * + * @return archive list + */ + @Nonnull + List listArchives() throws IOException; + + /** + * Opens a given archive for reading. + * + * @param archiveName + * @return the archive reader or null if the archive doesn't exist + */ + @Nullable + SegmentArchiveReader open(@Nonnull String archiveName) throws IOException; + + /** + * Creates a new archive. + * + * @param archiveName + * @return the archive writer + */ + @Nonnull + SegmentArchiveWriter create(@Nonnull String archiveName); + + /** + * Deletes the archive if exists. + * + * @param archiveName + * @return true if the archive was removed, false otherwise + */ + boolean delete(@Nonnull String archiveName); + + /** + * Renames the archive. + * + * @param from the existing archive + * @param to new name + * @return true if the archive was renamed, false otherwise + */ + boolean renameTo(@Nonnull String from, @Nonnull String to); + + /** + * Copies the archive with all the segments. + * + * @param from the existing archive + * @param to new name + */ + void copyFile(@Nonnull String from, @Nonnull String to) throws IOException; + + /** + * Check if archive exists. + * + * @param archiveName archive to check + * @return true if archive exists, false otherwise + */ + boolean exists(@Nonnull String archiveName); + + /** + * Finds all the segments included in the archive. + * + * @param archiveName archive to recover + * @param entries results will be put there, in the order of presence in the + * archive + */ + void recoverEntries(@Nonnull String archiveName, @Nonnull LinkedHashMap entries) throws IOException; + + /** + * Allows to write in the new archive. + */ + interface SegmentArchiveWriter { + + /** + * Write the new segment to the archive. + * + * @param msb + * @param lsb + * @param data + * @param offset + * @param size + * @param generation + * @return the entry representing the new segment. Can be later used for the {@link #readSegment(TarEntry)} method. + */ + @Nonnull + TarEntry writeSegment(long msb, long lsb, @Nonnull byte[] data, int offset, int size, @Nonnull GCGeneration generation) throws IOException; + + /** + * Read the segment. + * + * @param tarEntry + * @return byte buffer containing the segment data or null if segment doesn't exist + */ + @Nullable + ByteBuffer readSegment(@Nonnull TarEntry tarEntry) throws IOException; + + /** + * Write the index data. + * + * @param data + */ + void writeIndex(@Nonnull byte[] data) throws IOException; + + /** + * Write the graph data. + * + * @param data + */ + void writeGraph(@Nonnull byte[] data) throws IOException; + + /** + * Write the binary references data. + * + * @param data + */ + void writeBinaryReferences(@Nonnull byte[] data) throws IOException; + + /** + * Get the current length of the archive. + * + * @return length of the archive, in bytes + */ + long getLength(); + + /** + * Close the archive. + */ + void close() throws IOException; + + /** + * Check if the archive has been created (eg. something has been written). + * + * @return true if the archive has been created, false otherwise + */ + boolean isCreated(); + + /** + * Flush all the data to the storage. + */ + void flush() throws IOException; + + /** + * Get the name of the archive. + * + * @return archive name + */ + @Nonnull + String getName(); + } + + interface SegmentArchiveReader { + + /** + * Read the segment. + * + * @param msb + * @param lsb + * @return byte buffer containing the segment data or null if segment doesn't exist + */ + @Nullable + ByteBuffer readSegment(long msb, long lsb) throws IOException; + + /** + * Returns the index. + * + * @return segment index + */ + @Nonnull + Index getIndex(); + + /** + * Loads and returns the graph. + * + * @return the segment graph or null if the persisted graph doesn't exist. + */ + @Nullable + Map> getGraph() throws IOException; + + /** + * Check if the persisted graph exists. + * + * @return true if the graph exists, false otherwise + */ + boolean hasGraph(); + + /** + * Loads and returns the binary references. + * + * @return binary references + */ + @Nonnull + BinaryReferencesIndex getBinaryReferences() throws IOException, InvalidBinaryReferencesIndexException; + + /** + * Get the current length of the archive. + * + * @return length of the archive, in bytes + */ + long length(); + + /** + * Get the name of the archive. + * + * @return archive name + */ + @Nonnull + String getName(); + + /** + * Close the archive. + */ + void close() throws IOException; + + /** + * Returns the size of the entry + * @param size + * @return + */ + int getEntrySize(int size); + } + +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStorePersistence.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStorePersistence.java new file mode 100644 index 0000000000..ff5ade11e5 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStorePersistence.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment; + +import org.apache.jackrabbit.oak.segment.file.tar.FileStoreMonitor; +import org.apache.jackrabbit.oak.segment.file.tar.IOMonitor; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +public interface SegmentNodeStorePersistence { + + SegmentArchiveManager createArchiveManager(boolean memoryMapping, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) throws IOException; + + boolean segmentFilesExist(); + + JournalFile getJournalFile(String journalFile); + + GCJournalFile getGCJournalFile(String gcJournalFile); + + ManifestFile getManifestFile(String manifestFile); + + RepositoryLock lockRepository(String lockFileName) throws IOException; + + interface JournalFile { + + JournalFileReader openJournalReader() throws IOException; + + JournalFileWriter openJournalWriter() throws IOException; + + String getName(); + + boolean exists(); + } + + interface JournalFileReader extends Closeable { + + String readLine() throws IOException; + + } + + interface JournalFileWriter extends Closeable { + + void truncate() throws IOException; + + void writeLine(String line) throws IOException; + + } + + interface GCJournalFile { + + void writeLine(String line) throws IOException; + + List readLines() throws IOException; + + } + + interface ManifestFile { + + boolean exists(); + + Properties load() throws IOException; + + void save(Properties properties) throws IOException; + + } + + interface RepositoryLock { + + void unlock() throws IOException; + + } +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java index 8996918699..cb6af7cfd0 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java @@ -24,7 +24,6 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Collection; import java.util.HashSet; import java.util.Set; import java.util.UUID; @@ -32,7 +31,6 @@ import java.util.UUID; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; -import org.apache.commons.io.FileUtils; import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; import org.apache.jackrabbit.oak.segment.CachingSegmentReader; import org.apache.jackrabbit.oak.segment.RecordType; @@ -45,6 +43,7 @@ import org.apache.jackrabbit.oak.segment.SegmentId; import org.apache.jackrabbit.oak.segment.SegmentIdFactory; import org.apache.jackrabbit.oak.segment.SegmentIdProvider; import org.apache.jackrabbit.oak.segment.SegmentNodeState; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; import org.apache.jackrabbit.oak.segment.SegmentNotFoundException; import org.apache.jackrabbit.oak.segment.SegmentReader; import org.apache.jackrabbit.oak.segment.SegmentStore; @@ -88,20 +87,15 @@ public abstract class AbstractFileStore implements SegmentStore, Closeable { */ private static final int MAX_STORE_VERSION = 2; - static ManifestChecker newManifestChecker(File directory, boolean strictVersionCheck) { + static ManifestChecker newManifestChecker(SegmentNodeStorePersistence persistence, boolean strictVersionCheck) { return ManifestChecker.newManifestChecker( - new File(directory, MANIFEST_FILE_NAME), - notEmptyDirectory(directory), + persistence.getManifestFile(MANIFEST_FILE_NAME), + persistence.segmentFilesExist(), strictVersionCheck ? MAX_STORE_VERSION : MIN_STORE_VERSION, MAX_STORE_VERSION ); } - private static boolean notEmptyDirectory(File path) { - Collection entries = FileUtils.listFiles(path, new String[] {"tar"}, false); - return !entries.isEmpty(); - } - @Nonnull final SegmentTracker tracker; diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileReaper.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileReaper.java index 7818af1d20..06c4657ffb 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileReaper.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileReaper.java @@ -19,37 +19,42 @@ package org.apache.jackrabbit.oak.segment.file; import static com.google.common.collect.Lists.newArrayList; -import java.io.File; import java.io.IOException; -import java.nio.file.Files; import java.util.HashSet; import java.util.List; import java.util.Set; import com.google.common.base.Joiner; +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Thread-safe class tracking files to be removed. */ -class FileReaper { +public class FileReaper { private static final Logger logger = LoggerFactory.getLogger(FileReaper.class); - private final Set files = new HashSet<>(); + private final Set files = new HashSet<>(); private final Object lock = new Object(); + private final SegmentArchiveManager archiveManager; + + public FileReaper(SegmentArchiveManager archiveManager) { + this.archiveManager = archiveManager; + } + /** * Add files to be removed. The same file can be added more than once. * Duplicates are ignored. * * @param files group of files to be removed. */ - void add(Iterable files) { + void add(Iterable files) { synchronized (lock) { - for (File file : files) { + for (String file : files) { this.files.add(file); } } @@ -59,21 +64,20 @@ class FileReaper { * Reap previously added files. */ void reap() { - Set reap; + Set reap; synchronized (lock) { reap = new HashSet<>(files); files.clear(); } - Set redo = new HashSet<>(); - List removed = newArrayList(); - for (File file : reap) { - try { - Files.deleteIfExists(file.toPath()); + Set redo = new HashSet<>(); + List removed = newArrayList(); + for (String file : reap) { + if (archiveManager.delete(file)) { removed.add(file); - } catch (IOException e) { - logger.warn(String.format("Unable to remove file %s", file), e); + } else { + logger.warn("Unable to remove file {}", file); redo.add(file); } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java index bce27fe1b7..104ada5070 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java @@ -44,12 +44,8 @@ import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.IDLE; import static org.apache.jackrabbit.oak.segment.file.TarRevisions.EXPEDITE_OPTION; import static org.apache.jackrabbit.oak.segment.file.TarRevisions.timeout; -import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; import java.nio.ByteBuffer; -import java.nio.channels.FileLock; -import java.nio.channels.OverlappingFileLockException; import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; @@ -79,6 +75,7 @@ import org.apache.jackrabbit.oak.segment.Segment; import org.apache.jackrabbit.oak.segment.SegmentId; import org.apache.jackrabbit.oak.segment.SegmentNodeBuilder; import org.apache.jackrabbit.oak.segment.SegmentNodeState; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; import org.apache.jackrabbit.oak.segment.SegmentNotFoundException; import org.apache.jackrabbit.oak.segment.SegmentNotFoundExceptionListener; import org.apache.jackrabbit.oak.segment.SegmentWriter; @@ -89,7 +86,6 @@ import org.apache.jackrabbit.oak.segment.file.ShutDown.ShutDownCloser; import org.apache.jackrabbit.oak.segment.file.tar.CleanupContext; import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; import org.apache.jackrabbit.oak.segment.file.tar.TarFiles; -import org.apache.jackrabbit.oak.segment.file.tar.TarFiles.CleanupResult; import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; @@ -127,10 +123,7 @@ public class FileStore extends AbstractFileStore { private final TarFiles tarFiles; - private final RandomAccessFile lockFile; - - @Nonnull - private final FileLock lock; + private final SegmentNodeStorePersistence.RepositoryLock repositoryLock; private volatile TarRevisions revisions; @@ -144,7 +137,7 @@ public class FileStore extends AbstractFileStore { * not be removed immediately, because they first need to be closed, and the * JVM needs to release the memory mapped file references. */ - private final FileReaper fileReaper = new FileReaper(); + private final FileReaper fileReaper; /** * This flag is periodically updated by calling the {@code SegmentGCOptions} @@ -168,13 +161,8 @@ public class FileStore extends AbstractFileStore { FileStore(final FileStoreBuilder builder) throws InvalidFileStoreVersionException, IOException { super(builder); - lockFile = new RandomAccessFile(new File(directory, LOCK_FILE_NAME), "rw"); - try { - lock = lockFile.getChannel().lock(); - } catch (OverlappingFileLockException ex) { - throw new IllegalStateException(directory.getAbsolutePath() - + " is in use by another store.", ex); - } + SegmentNodeStorePersistence persistence = builder.getPersistence(); + repositoryLock = persistence.lockRepository(LOCK_FILE_NAME); this.segmentWriter = defaultSegmentWriterBuilder("sys") .withGeneration(() -> getGcGeneration().nonGC()) @@ -185,24 +173,29 @@ public class FileStore extends AbstractFileStore { this.garbageCollector = new GarbageCollector( builder.getGcOptions(), builder.getGcListener(), - new GCJournal(directory), + new GCJournal(persistence.getGCJournalFile(GCJournal.GC_JOURNAL)), builder.getCacheManager() .withAccessTracking("COMPACT", builder.getStatsProvider())); - newManifestChecker(directory, builder.getStrictVersionCheck()).checkAndUpdateManifest(); + newManifestChecker(persistence, builder.getStrictVersionCheck()).checkAndUpdateManifest(); this.stats = new FileStoreStats(builder.getStatsProvider(), this, 0); - this.tarFiles = TarFiles.builder() + + TarFiles.Builder tarFilesBuilder = TarFiles.builder() .withDirectory(directory) .withMemoryMapping(memoryMapping) .withTarRecovery(recovery) .withIOMonitor(ioMonitor) .withFileStoreMonitor(stats) .withMaxFileSize(builder.getMaxFileSize() * MB) - .build(); + .withPersistence(builder.getPersistence()); + + this.tarFiles = tarFilesBuilder.build(); long size = this.tarFiles.size(); this.stats.init(size); + this.fileReaper = this.tarFiles.createFileReaper(); + this.snfeListener = builder.getSnfeListener(); fileStoreScheduler.scheduleAtFixedRate( @@ -465,8 +458,7 @@ public class FileStore extends AbstractFileStore { } Closer closer = Closer.create(); - closer.register(lockFile); - closer.register(lock::release); + closer.register(repositoryLock::unlock); closer.register(tarFiles) ; closer.register(revisions); @@ -1035,7 +1027,7 @@ public class FileStore extends AbstractFileStore { * @throws IOException */ @Nonnull - private List cleanup(@Nonnull CompactionResult compactionResult) + private List cleanup(@Nonnull CompactionResult compactionResult) throws IOException { Stopwatch watch = Stopwatch.createStarted(); @@ -1047,7 +1039,7 @@ public class FileStore extends AbstractFileStore { // to clear stale weak references in the SegmentTracker System.gc(); - CleanupResult cleanupResult = tarFiles.cleanup(newCleanupContext(compactionResult.reclaimer())); + TarFiles.CleanupResult cleanupResult = tarFiles.cleanup(newCleanupContext(compactionResult.reclaimer())); if (cleanupResult.isInterrupted()) { gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT); } @@ -1069,7 +1061,7 @@ public class FileStore extends AbstractFileStore { return cleanupResult.getRemovableFiles(); } - private String toFileNames(@Nonnull List files) { + private String toFileNames(@Nonnull List files) { if (files.isEmpty()) { return "none"; } else { diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java index 0901899fb0..2301da4501 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java @@ -40,6 +40,7 @@ import com.google.common.base.Predicate; import org.apache.jackrabbit.oak.segment.CacheWeights.NodeCacheWeigher; import org.apache.jackrabbit.oak.segment.CacheWeights.StringCacheWeigher; import org.apache.jackrabbit.oak.segment.CacheWeights.TemplateCacheWeigher; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; import org.apache.jackrabbit.oak.segment.RecordCache; import org.apache.jackrabbit.oak.segment.SegmentNotFoundExceptionListener; @@ -47,6 +48,7 @@ import org.apache.jackrabbit.oak.segment.WriterCacheManager; import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions; import org.apache.jackrabbit.oak.segment.file.tar.IOMonitor; import org.apache.jackrabbit.oak.segment.file.tar.IOMonitorAdapter; +import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.gc.DelegatingGCMonitor; import org.apache.jackrabbit.oak.spi.gc.GCMonitor; @@ -88,6 +90,8 @@ public class FileStoreBuilder { private boolean memoryMapping = MEMORY_MAPPING_DEFAULT; + private SegmentNodeStorePersistence persistence; + @Nonnull private StatisticsProvider statsProvider = StatisticsProvider.NOOP; @@ -139,6 +143,7 @@ public class FileStoreBuilder { private FileStoreBuilder(@Nonnull File directory) { this.directory = checkNotNull(directory); this.gcListener.registerGCMonitor(new LoggingGCMonitor(LOG)); + this.persistence = new TarPersistence(directory); } /** @@ -314,6 +319,11 @@ public class FileStoreBuilder { return this; } + public FileStoreBuilder withCustomPersistence(SegmentNodeStorePersistence persistence) throws IOException { + this.persistence = persistence; + return this; + } + /** * Create a new {@link FileStore} instance with the settings specified in this * builder. If none of the {@code with} methods have been called before calling @@ -336,7 +346,7 @@ public class FileStoreBuilder { checkState(!built, "Cannot re-use builder"); built = true; directory.mkdirs(); - TarRevisions revisions = new TarRevisions(directory); + TarRevisions revisions = new TarRevisions(persistence); LOG.info("Creating file store {}", this); FileStore store; try { @@ -376,7 +386,7 @@ public class FileStoreBuilder { checkState(directory.exists() && directory.isDirectory(), "%s does not exist or is not a directory", directory); built = true; - ReadOnlyRevisions revisions = new ReadOnlyRevisions(directory); + ReadOnlyRevisions revisions = new ReadOnlyRevisions(persistence); LOG.info("Creating file store {}", this); ReadOnlyFileStore store; try { @@ -443,6 +453,10 @@ public class FileStoreBuilder { return snfeListener; } + SegmentNodeStorePersistence getPersistence() { + return persistence; + } + /** * @return creates or returns the {@code WriterCacheManager} this builder passes or * passed to the store on {@link #build()}. diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreUtil.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreUtil.java index 2a4fc3ddb6..6414287342 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreUtil.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreUtil.java @@ -17,11 +17,11 @@ package org.apache.jackrabbit.oak.segment.file; -import java.io.File; import java.io.IOException; import org.apache.jackrabbit.oak.segment.RecordId; import org.apache.jackrabbit.oak.segment.SegmentIdProvider; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; import org.apache.jackrabbit.oak.segment.SegmentStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,9 +45,9 @@ class FileStoreUtil { * found. * @throws IOException If an I/O error occurs. */ - static RecordId findPersistedRecordId(SegmentStore store, SegmentIdProvider idProvider, File journal) + static RecordId findPersistedRecordId(SegmentStore store, SegmentIdProvider idProvider, SegmentNodeStorePersistence.JournalFile journalFile) throws IOException { - try (JournalReader journalReader = new JournalReader(journal)) { + try (JournalReader journalReader = new JournalReader(journalFile)) { while (journalReader.hasNext()) { JournalEntry entry = journalReader.next(); try { diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCJournal.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCJournal.java index 1fd8851a78..d806dd5e79 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCJournal.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCJournal.java @@ -19,20 +19,10 @@ package org.apache.jackrabbit.oak.segment.file; -import static com.google.common.base.Charsets.UTF_8; import static com.google.common.base.Preconditions.checkNotNull; -import static java.nio.file.Files.newBufferedWriter; -import static java.nio.file.Files.readAllLines; -import static java.nio.file.StandardOpenOption.APPEND; -import static java.nio.file.StandardOpenOption.CREATE; -import static java.nio.file.StandardOpenOption.DSYNC; -import static java.nio.file.StandardOpenOption.WRITE; import static org.apache.jackrabbit.oak.segment.file.tar.GCGeneration.newGCGeneration; -import java.io.BufferedWriter; -import java.io.File; import java.io.IOException; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -42,6 +32,7 @@ import javax.annotation.Nonnull; import com.google.common.base.Joiner; import org.apache.jackrabbit.oak.segment.RecordId; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence.GCJournalFile; import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,13 +49,12 @@ public class GCJournal { public static final String GC_JOURNAL = "gc.log"; - @Nonnull - private final File directory; + private final GCJournalFile journalFile; private GCJournalEntry latest; - public GCJournal(@Nonnull File directory) { - this.directory = checkNotNull(directory); + public GCJournal(@Nonnull GCJournalFile journalFile) { + this.journalFile = journalFile; } /** @@ -90,13 +80,8 @@ public class GCJournal { } latest = new GCJournalEntry(repoSize, reclaimedSize, System.currentTimeMillis(), gcGeneration, nodes, checkNotNull(root)); - Path path = new File(directory, GC_JOURNAL).toPath(); try { - try (BufferedWriter w = newBufferedWriter(path, UTF_8, WRITE, - APPEND, CREATE, DSYNC)) { - w.write(latest.toString()); - w.newLine(); - } + journalFile.writeLine(latest.toString()); } catch (IOException e) { LOG.error("Error writing gc journal", e); } @@ -130,13 +115,10 @@ public class GCJournal { } private List readLines() { - File file = new File(directory, GC_JOURNAL); - if (file.exists()) { - try { - return readAllLines(file.toPath(), UTF_8); - } catch (IOException e) { - LOG.error("Error reading gc journal", e); - } + try { + return journalFile.readLines(); + } catch (IOException e) { + LOG.error("Error reading gc journal", e); } return new ArrayList(); } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/JournalReader.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/JournalReader.java index eca8bfda03..b341881cf1 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/JournalReader.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/JournalReader.java @@ -19,14 +19,11 @@ package org.apache.jackrabbit.oak.segment.file; -import static java.nio.charset.Charset.defaultCharset; - import java.io.Closeable; -import java.io.File; import java.io.IOException; import java.util.List; -import org.apache.commons.io.input.ReversedLinesFileReader; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,10 +37,10 @@ import com.google.common.collect.AbstractIterator; public final class JournalReader extends AbstractIterator implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(JournalReader.class); - private final ReversedLinesFileReader journal; + private final SegmentNodeStorePersistence.JournalFileReader reader; - public JournalReader(File journalFile) throws IOException { - journal = new ReversedLinesFileReader(journalFile, defaultCharset()); + public JournalReader(SegmentNodeStorePersistence.JournalFile journal) throws IOException { + this.reader = journal.openJournalReader(); } /** @@ -54,7 +51,7 @@ public final class JournalReader extends AbstractIterator implemen protected JournalEntry computeNext() { try { String line = null; - while ((line = journal.readLine()) != null) { + while ((line = reader.readLine()) != null) { if (line.indexOf(' ') != -1) { List splits = Splitter.on(' ').splitToList(line); String revision = splits.get(0); @@ -83,6 +80,6 @@ public final class JournalReader extends AbstractIterator implemen @Override public void close() throws IOException { - journal.close(); + reader.close(); } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/LocalGCJournalFile.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/LocalGCJournalFile.java new file mode 100644 index 0000000000..7826844d7f --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/LocalGCJournalFile.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.file; + +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static com.google.common.base.Charsets.UTF_8; +import static java.nio.file.Files.newBufferedWriter; +import static java.nio.file.Files.readAllLines; +import static java.nio.file.StandardOpenOption.APPEND; +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.DSYNC; +import static java.nio.file.StandardOpenOption.WRITE; + +public class LocalGCJournalFile implements SegmentNodeStorePersistence.GCJournalFile { + + private final File file; + + public LocalGCJournalFile(File parent, String name) { + this(new File(parent, name)); + } + + public LocalGCJournalFile(File file) { + this.file = file; + } + + @Override + public void writeLine(String line) throws IOException { + try (BufferedWriter w = newBufferedWriter(file.toPath(), UTF_8, WRITE, APPEND, CREATE, DSYNC)) { + w.write(line); + w.newLine(); + } + } + + @Override + public List readLines() throws IOException { + if (file.exists()) { + return readAllLines(file.toPath(), UTF_8); + } + return new ArrayList(); + } +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/LocalManifestFile.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/LocalManifestFile.java new file mode 100644 index 0000000000..f6a97dde14 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/LocalManifestFile.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.file; + +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence.ManifestFile; + +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Properties; + +public class LocalManifestFile implements ManifestFile { + + private final File file; + + public LocalManifestFile(File parent, String name) { + this(new File(parent, name)); + } + + public LocalManifestFile(File file) { + this.file = file; + } + + @Override + public boolean exists() { + return file.exists(); + } + + @Override + public Properties load() throws IOException { + Properties properties = new Properties(); + try (FileReader r = new FileReader(file)) { + properties.load(r); + } + return properties; + } + + @Override + public void save(Properties properties) throws IOException { + properties.store(new FileWriter(file), null); + } + +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/Manifest.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/Manifest.java index 0041154943..7fdfc9589b 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/Manifest.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/Manifest.java @@ -17,9 +17,9 @@ package org.apache.jackrabbit.oak.segment.file; -import java.io.File; -import java.io.FileReader; -import java.io.FileWriter; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence.ManifestFile; + import java.io.IOException; import java.util.Properties; @@ -34,12 +34,8 @@ class Manifest { * @return A manifest file. * @throws IOException If any error occurs when loading the manifest. */ - static Manifest load(File file) throws IOException { - Properties properties = new Properties(); - try (FileReader r = new FileReader(file)) { - properties.load(r); - } - return new Manifest(properties); + static Manifest load(ManifestFile file) throws IOException { + return new Manifest(file.load()); } /** @@ -85,8 +81,8 @@ class Manifest { * @param file The file to save the manifest to. * @throws IOException if an error occurs while saving the manifest. */ - void save(File file) throws IOException { - properties.store(new FileWriter(file), null); + void save(ManifestFile file) throws IOException { + file.save(properties); } private int getIntegerProperty(String name, int otherwise) { diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ManifestChecker.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ManifestChecker.java index 5b5daa9bee..78fe74412e 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ManifestChecker.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ManifestChecker.java @@ -17,21 +17,22 @@ package org.apache.jackrabbit.oak.segment.file; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence.ManifestFile; + import static com.google.common.base.Preconditions.checkArgument; -import java.io.File; import java.io.IOException; public class ManifestChecker { - public static ManifestChecker newManifestChecker(File path, boolean shouldExist, int minStoreVersion, int maxStoreVersion) { - checkArgument(path != null, "path"); + public static ManifestChecker newManifestChecker(ManifestFile file, boolean shouldExist, int minStoreVersion, int maxStoreVersion) { + checkArgument(file != null, "file"); checkArgument(minStoreVersion > 0, "minStoreVersion"); checkArgument(maxStoreVersion > 0, "maxStoreVersion"); - return new ManifestChecker(path, shouldExist, minStoreVersion, maxStoreVersion); + return new ManifestChecker(file, shouldExist, minStoreVersion, maxStoreVersion); } - private final File file; + private final ManifestFile file; private final boolean shouldExist; @@ -39,7 +40,7 @@ public class ManifestChecker { private final int maxStoreVersion; - private ManifestChecker(File file, boolean shouldExist, int minStoreVersion, int maxStoreVersion) { + private ManifestChecker(ManifestFile file, boolean shouldExist, int minStoreVersion, int maxStoreVersion) { this.file = file; this.shouldExist = shouldExist; this.minStoreVersion = minStoreVersion; diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java index de9bc9fb30..b08ad32a76 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java @@ -63,7 +63,7 @@ public class ReadOnlyFileStore extends AbstractFileStore { ReadOnlyFileStore(FileStoreBuilder builder) throws InvalidFileStoreVersionException, IOException { super(builder); - newManifestChecker(directory, builder.getStrictVersionCheck()).checkManifest(); + newManifestChecker(builder.getPersistence(), builder.getStrictVersionCheck()).checkManifest(); tarFiles = TarFiles.builder() .withDirectory(directory) diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyRevisions.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyRevisions.java index 4e6b513b47..948d23081a 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyRevisions.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyRevisions.java @@ -23,9 +23,7 @@ import static com.google.common.base.Preconditions.checkState; import static org.apache.jackrabbit.oak.segment.file.FileStoreUtil.findPersistedRecordId; import java.io.Closeable; -import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nonnull; @@ -34,31 +32,21 @@ import com.google.common.base.Function; import org.apache.jackrabbit.oak.segment.RecordId; import org.apache.jackrabbit.oak.segment.Revisions; import org.apache.jackrabbit.oak.segment.SegmentIdProvider; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; import org.apache.jackrabbit.oak.segment.SegmentStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class ReadOnlyRevisions implements Revisions, Closeable { - private static final Logger LOG = LoggerFactory - .getLogger(ReadOnlyRevisions.class); - public static final String JOURNAL_FILE_NAME = "journal.log"; @Nonnull private final AtomicReference head; @Nonnull - private final File directory; - - @Nonnull - private final RandomAccessFile journalFile; + private final SegmentNodeStorePersistence.JournalFile journalFile; - public ReadOnlyRevisions(@Nonnull File directory) throws IOException { - this.directory = checkNotNull(directory); - this.journalFile = new RandomAccessFile(new File(directory, - JOURNAL_FILE_NAME), "r"); - this.journalFile.seek(journalFile.length()); + public ReadOnlyRevisions(@Nonnull SegmentNodeStorePersistence persistence) throws IOException { + this.journalFile = checkNotNull(persistence).getJournalFile(JOURNAL_FILE_NAME); this.head = new AtomicReference<>(null); } @@ -74,7 +62,7 @@ public class ReadOnlyRevisions implements Revisions, Closeable { if (head.get() != null) { return; } - RecordId persistedId = findPersistedRecordId(store, idProvider, new File(directory, JOURNAL_FILE_NAME)); + RecordId persistedId = findPersistedRecordId(store, idProvider, journalFile); if (persistedId == null) { throw new IllegalStateException("Cannot start readonly store from empty journal"); } @@ -112,14 +100,8 @@ public class ReadOnlyRevisions implements Revisions, Closeable { throw new UnsupportedOperationException("ReadOnly Revisions"); } - /** - * Close the underlying journal file. - * - * @throws IOException - */ @Override public void close() throws IOException { - journalFile.close(); + // do nothing } - } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarRevisions.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarRevisions.java index 963894d838..154349c95f 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarRevisions.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarRevisions.java @@ -19,16 +19,13 @@ package org.apache.jackrabbit.oak.segment.file; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static java.lang.Long.MAX_VALUE; import static java.util.concurrent.TimeUnit.DAYS; import static org.apache.jackrabbit.oak.segment.file.FileStoreUtil.findPersistedRecordId; import java.io.Closeable; -import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -44,6 +41,7 @@ import com.google.common.base.Supplier; import org.apache.jackrabbit.oak.segment.RecordId; import org.apache.jackrabbit.oak.segment.Revisions; import org.apache.jackrabbit.oak.segment.SegmentIdProvider; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; import org.apache.jackrabbit.oak.segment.SegmentStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,14 +70,15 @@ public class TarRevisions implements Revisions, Closeable { @Nonnull private final AtomicReference head; - @Nonnull - private final File directory; + private final SegmentNodeStorePersistence persistence; + + private final SegmentNodeStorePersistence.JournalFile journalFile; /** - * The journal file. It is protected by {@link #journalFileLock}. It becomes + * The journal file writer. It is protected by {@link #journalFileLock}. It becomes * {@code null} after it's closed. */ - private volatile RandomAccessFile journalFile; + private volatile SegmentNodeStorePersistence.JournalFileWriter journalFileWriter; /** * The persisted head of the root journal, used to determine whether the @@ -138,16 +137,15 @@ public class TarRevisions implements Revisions, Closeable { /** * Create a new instance placing the journal log file into the passed * {@code directory}. - * @param directory directory of the journal file + * @param persistence object representing the segment persistence * @throws IOException */ - public TarRevisions(@Nonnull File directory) throws IOException { - this.directory = checkNotNull(directory); - this.journalFile = new RandomAccessFile(new File(directory, - JOURNAL_FILE_NAME), "rw"); - this.journalFile.seek(journalFile.length()); + public TarRevisions(SegmentNodeStorePersistence persistence) throws IOException { + this.journalFile = persistence.getJournalFile(JOURNAL_FILE_NAME); + this.journalFileWriter = journalFile.openJournalWriter(); this.head = new AtomicReference<>(null); this.persistedHead = new AtomicReference<>(null); + this.persistence = persistence; } /** @@ -164,7 +162,7 @@ public class TarRevisions implements Revisions, Closeable { if (head.get() != null) { return; } - RecordId persistedId = findPersistedRecordId(store, idProvider, new File(directory, JOURNAL_FILE_NAME)); + RecordId persistedId = findPersistedRecordId(store, idProvider, journalFile); if (persistedId == null) { head.set(writeInitialNode.get()); } else { @@ -226,7 +224,7 @@ public class TarRevisions implements Revisions, Closeable { } private void doFlush(Flusher flusher) throws IOException { - if (journalFile == null) { + if (journalFileWriter == null) { LOG.debug("No journal file available, skipping flush"); return; } @@ -238,8 +236,7 @@ public class TarRevisions implements Revisions, Closeable { } flusher.flush(); LOG.debug("TarMK journal update {} -> {}", before, after); - journalFile.writeBytes(after.toString10() + " root " + System.currentTimeMillis() + "\n"); - journalFile.getChannel().force(false); + journalFileWriter.writeLine(after.toString10() + " root " + System.currentTimeMillis()); persistedHead.set(after); } @@ -354,11 +351,11 @@ public class TarRevisions implements Revisions, Closeable { public void close() throws IOException { journalFileLock.lock(); try { - if (journalFile == null) { + if (journalFileWriter == null) { return; } - journalFile.close(); - journalFile = null; + journalFileWriter.close(); + journalFileWriter = null; } finally { journalFileLock.unlock(); } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/GraphLoader.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/GraphLoader.java new file mode 100644 index 0000000000..b7a876e790 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/GraphLoader.java @@ -0,0 +1,98 @@ +package org.apache.jackrabbit.oak.segment.file.tar; + +import org.apache.jackrabbit.oak.segment.util.ReaderAtEnd; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.zip.CRC32; + +import static com.google.common.collect.Lists.newArrayListWithCapacity; +import static com.google.common.collect.Maps.newHashMapWithExpectedSize; +import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.GRAPH_MAGIC; + +public final class GraphLoader { + + private static final Logger log = LoggerFactory.getLogger(GraphLoader.class); + + private static final int FOOTER_SIZE = 16; + + private GraphLoader() { + } + + /** + * Loads the optional pre-compiled graph entry from the given tar file. + * + * @return the graph or {@code null} if one was not found + * @throws IOException if the tar file could not be read + */ + public static ByteBuffer loadGraph(ReaderAtEnd readerAtEnd) throws IOException { + ByteBuffer meta = readerAtEnd.readAtEnd(FOOTER_SIZE, FOOTER_SIZE); + + int crc32 = meta.getInt(); + int count = meta.getInt(); + int bytes = meta.getInt(); + int magic = meta.getInt(); + + if (magic != GRAPH_MAGIC) { + log.warn("Invalid graph magic number"); + return null; + } + + if (count < 0) { + log.warn("Invalid number of entries"); + return null; + } + + if (bytes < 4 + count * 34) { + log.warn("Invalid entry size"); + return null; + } + + ByteBuffer graph = readerAtEnd.readAtEnd(bytes, bytes); + + byte[] b = new byte[bytes - FOOTER_SIZE]; + + graph.mark(); + graph.get(b); + graph.reset(); + + CRC32 checksum = new CRC32(); + checksum.update(b); + + if (crc32 != (int) checksum.getValue()) { + log.warn("Invalid graph checksum in tar file"); + return null; + } + + return graph; + } + + public static Map> parseGraph(ByteBuffer buffer) { + int nEntries = buffer.getInt(buffer.limit() - 12); + + Map> graph = newHashMapWithExpectedSize(nEntries); + + for (int i = 0; i < nEntries; i++) { + long msb = buffer.getLong(); + long lsb = buffer.getLong(); + int nVertices = buffer.getInt(); + + List vertices = newArrayListWithCapacity(nVertices); + + for (int j = 0; j < nVertices; j++) { + long vMsb = buffer.getLong(); + long vLsb = buffer.getLong(); + vertices.add(new UUID(vMsb, vLsb)); + } + + graph.put(new UUID(msb, lsb), vertices); + } + + return graph; + } +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/LocalJournalFile.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/LocalJournalFile.java new file mode 100644 index 0000000000..f3931c6e2d --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/LocalJournalFile.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.file.tar; + +import org.apache.commons.io.input.ReversedLinesFileReader; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; + +import static java.nio.charset.Charset.defaultCharset; + +public class LocalJournalFile implements SegmentNodeStorePersistence.JournalFile { + + private final File journalFile; + + public LocalJournalFile(File directory, String journalFile) { + this.journalFile = new File(directory, journalFile); + } + + public LocalJournalFile(File journalFile) { + this.journalFile = journalFile; + } + + @Override + public SegmentNodeStorePersistence.JournalFileReader openJournalReader() throws IOException { + return new LocalJournalFileReader(journalFile); + } + + @Override + public SegmentNodeStorePersistence.JournalFileWriter openJournalWriter() throws IOException { + return new LocalJournalFileWriter(journalFile); + } + + @Override + public String getName() { + return journalFile.getName(); + } + + @Override + public boolean exists() { + return journalFile.exists(); + } + + private static class LocalJournalFileReader implements SegmentNodeStorePersistence.JournalFileReader { + + private final ReversedLinesFileReader journal; + + public LocalJournalFileReader(File file) throws IOException { + journal = new ReversedLinesFileReader(file, defaultCharset()); + } + + @Override + public String readLine() throws IOException { + return journal.readLine(); + } + + @Override + public void close() throws IOException { + journal.close(); + } + } + + private static class LocalJournalFileWriter implements SegmentNodeStorePersistence.JournalFileWriter { + + private final RandomAccessFile journalFile; + + public LocalJournalFileWriter(File file) throws IOException { + journalFile = new RandomAccessFile(file, "rw"); + journalFile.seek(journalFile.length()); + } + + @Override + public void truncate() throws IOException { + journalFile.setLength(0); + } + + @Override + public void writeLine(String line) throws IOException { + journalFile.writeBytes(line + "\n"); + journalFile.getChannel().force(false); + } + + @Override + public void close() throws IOException { + journalFile.close(); + } + } +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarManager.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarManager.java new file mode 100644 index 0000000000..045b67d516 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarManager.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.file.tar; + +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.file.tar.index.Index; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.zip.CRC32; + +import static com.google.common.base.Charsets.UTF_8; +import static java.nio.ByteBuffer.wrap; +import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.BLOCK_SIZE; + +public class SegmentTarManager implements SegmentArchiveManager { + + /** + * Pattern of the segment entry names. Note the trailing (\\..*)? group + * that's included for compatibility with possible future extensions. + */ + private static final Pattern NAME_PATTERN = Pattern.compile( + "([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})" + + "(\\.([0-9a-f]{8}))?(\\..*)?"); + + private static final Logger log = LoggerFactory.getLogger(SegmentTarManager.class); + + private final File segmentstoreDir; + + private final FileStoreMonitor fileStoreMonitor; + + private final IOMonitor ioMonitor; + + private final boolean memoryMapping; + + public SegmentTarManager(File segmentstoreDir, FileStoreMonitor fileStoreMonitor, IOMonitor ioMonitor, boolean memoryMapping) { + this.segmentstoreDir = segmentstoreDir; + this.fileStoreMonitor = fileStoreMonitor; + this.ioMonitor = ioMonitor; + this.memoryMapping = memoryMapping; + } + + @Override + public List listArchives() { + return Arrays.asList(segmentstoreDir.list()); + } + + @Override + public SegmentArchiveReader open(String name) throws IOException { + File file = new File(segmentstoreDir, name); + RandomAccessFile access = new RandomAccessFile(file, "r"); + try { + Index index = SegmentTarReader.loadAndValidateIndex(access, name); + if (index == null) { + log.info("No index found in tar file {}, skipping...", name); + return null; + } else { + if (memoryMapping) { + try { + FileAccess mapped = new FileAccess.Mapped(access); + return new SegmentTarReader(file, mapped, index, ioMonitor); + } catch (IOException e) { + log.warn("Failed to mmap tar file {}. Falling back to normal file " + + "IO, which will negatively impact repository performance. " + + "This problem may have been caused by restrictions on the " + + "amount of virtual memory available to the JVM. Please make " + + "sure that a 64-bit JVM is being used and that the process " + + "has access to unlimited virtual memory (ulimit option -v).", + name, e); + } + } + + FileAccess random = new FileAccess.Random(access); + // prevent the finally block from closing the file + // as the returned TarReader will take care of that + access = null; + return new SegmentTarReader(file, random, index, ioMonitor); + } + } finally { + if (access != null) { + access.close(); + } + } + } + + @Override + public SegmentArchiveWriter create(String archiveName) { + return new SegmentTarWriter(new File(segmentstoreDir, archiveName), fileStoreMonitor, ioMonitor); + } + + @Override + public boolean delete(String archiveName) { + try { + return Files.deleteIfExists(new File(segmentstoreDir, archiveName).toPath()); + } catch (IOException e) { + log.error("Can't remove archive {}", archiveName, e); + return false; + } + } + + @Override + public boolean renameTo(String from, String to) { + try { + Files.move(new File(segmentstoreDir, from).toPath(), new File(segmentstoreDir, to).toPath()); + return true; + } catch (IOException e) { + log.error("Can't move archive {} to {}", from, to, e); + return false; + } + } + + @Override + public void copyFile(String from, String to) throws IOException { + Files.copy(new File(segmentstoreDir, from).toPath(), new File(segmentstoreDir, to).toPath()); + } + + @Override + public boolean exists(String archiveName) { + return new File(segmentstoreDir, archiveName).exists(); + } + + @Override + public void recoverEntries(String archiveName, LinkedHashMap entries) throws IOException { + File file = new File(segmentstoreDir, archiveName); + RandomAccessFile access = new RandomAccessFile(file, "r"); + try { + recoverEntries(file, access, entries); + } finally { + access.close(); + } + } + + /** + * Scans through the tar file, looking for all segment entries. + * + * @param file The path of the TAR file. + * @param access The contents of the TAR file. + * @param entries The map that will contain the recovered entries. The + * entries are inserted in the {@link LinkedHashMap} in the + * order they appear in the TAR file. + */ + private static void recoverEntries(File file, RandomAccessFile access, LinkedHashMap entries) throws IOException { + byte[] header = new byte[BLOCK_SIZE]; + while (access.getFilePointer() + BLOCK_SIZE <= access.length()) { + // read the tar header block + access.readFully(header); + + // compute the header checksum + int sum = 0; + for (int i = 0; i < BLOCK_SIZE; i++) { + sum += header[i] & 0xff; + } + + + // identify possible zero block + if (sum == 0 && access.getFilePointer() + 2 * BLOCK_SIZE == access.length()) { + return; // found the zero blocks at the end of the file + } + + // replace the actual stored checksum with spaces for comparison + for (int i = 148; i < 148 + 8; i++) { + sum -= header[i] & 0xff; + sum += ' '; + } + + byte[] checkbytes = String.format("%06o\0 ", sum).getBytes(UTF_8); + for (int i = 0; i < checkbytes.length; i++) { + if (checkbytes[i] != header[148 + i]) { + log.warn("Invalid entry checksum at offset {} in tar file {}, skipping...", + access.getFilePointer() - BLOCK_SIZE, file); + } + } + + // The header checksum passes, so read the entry name and size + ByteBuffer buffer = wrap(header); + String name = readString(buffer, 100); + buffer.position(124); + int size = readNumber(buffer, 12); + if (access.getFilePointer() + size > access.length()) { + // checksum was correct, so the size field should be accurate + log.warn("Partial entry {} in tar file {}, ignoring...", name, file); + return; + } + + Matcher matcher = NAME_PATTERN.matcher(name); + if (matcher.matches()) { + UUID id = UUID.fromString(matcher.group(1)); + + String checksum = matcher.group(3); + if (checksum != null || !entries.containsKey(id)) { + byte[] data = new byte[size]; + access.readFully(data); + + // skip possible padding to stay at block boundaries + long position = access.getFilePointer(); + long remainder = position % BLOCK_SIZE; + if (remainder != 0) { + access.seek(position + (BLOCK_SIZE - remainder)); + } + + if (checksum != null) { + CRC32 crc = new CRC32(); + crc.update(data); + if (crc.getValue() != Long.parseLong(checksum, 16)) { + log.warn("Checksum mismatch in entry {} of tar file {}, skipping...", + name, file); + continue; + } + } + + entries.put(id, data); + } + } else if (!name.equals(file.getName() + ".idx")) { + log.warn("Unexpected entry {} in tar file {}, skipping...", + name, file); + long position = access.getFilePointer() + size; + long remainder = position % BLOCK_SIZE; + if (remainder != 0) { + position += BLOCK_SIZE - remainder; + } + access.seek(position); + } + } + } + + private static String readString(ByteBuffer buffer, int fieldSize) { + byte[] b = new byte[fieldSize]; + buffer.get(b); + int n = 0; + while (n < fieldSize && b[n] != 0) { + n++; + } + return new String(b, 0, n, UTF_8); + } + + private static int readNumber(ByteBuffer buffer, int fieldSize) { + byte[] b = new byte[fieldSize]; + buffer.get(b); + int number = 0; + for (int i = 0; i < fieldSize; i++) { + int digit = b[i] & 0xff; + if ('0' <= digit && digit <= '7') { + number = number * 8 + digit - '0'; + } else { + break; + } + } + return number; + } + +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarReader.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarReader.java new file mode 100644 index 0000000000..87fd96300b --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarReader.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.file.tar; + +import com.google.common.base.Stopwatch; +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndex; +import org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndexLoader; +import org.apache.jackrabbit.oak.segment.file.tar.binaries.InvalidBinaryReferencesIndexException; +import org.apache.jackrabbit.oak.segment.file.tar.index.Index; +import org.apache.jackrabbit.oak.segment.file.tar.index.IndexEntry; +import org.apache.jackrabbit.oak.segment.file.tar.index.IndexLoader; +import org.apache.jackrabbit.oak.segment.file.tar.index.InvalidIndexException; +import org.apache.jackrabbit.oak.segment.util.ReaderAtEnd; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.jackrabbit.oak.segment.file.tar.SegmentTarWriter.getPaddingSize; +import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.BLOCK_SIZE; +import static org.apache.jackrabbit.oak.segment.file.tar.index.IndexLoader.newIndexLoader; + +public class SegmentTarReader implements SegmentArchiveManager.SegmentArchiveReader { + + private static final Logger log = LoggerFactory.getLogger(SegmentTarReader.class); + + private static final IndexLoader indexLoader = newIndexLoader(BLOCK_SIZE); + + private final FileAccess access; + + private final File file; + + private final IOMonitor ioMonitor; + + private final String name; + + private final Index index; + + private volatile Boolean hasGraph; + + public SegmentTarReader(File file, FileAccess access, Index index, IOMonitor ioMonitor) { + this.access = access; + this.file = file; + this.index = index; + this.name = file.getName(); + this.ioMonitor = ioMonitor; + } + + @Override + public ByteBuffer readSegment(long msb, long lsb) throws IOException { + int i = index.findEntry(msb, lsb); + if (i == -1) { + return null; + } + IndexEntry indexEntry = index.entry(i); + ioMonitor.beforeSegmentRead(file, msb, lsb, indexEntry.getLength()); + Stopwatch stopwatch = Stopwatch.createStarted(); + ByteBuffer buffer = access.read(indexEntry.getPosition(), indexEntry.getLength()); + long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); + ioMonitor.afterSegmentRead(file, msb, lsb, indexEntry.getLength(), elapsed); + return buffer; + } + + @Override + public Index getIndex() { + return index; + } + + public static Index loadAndValidateIndex(RandomAccessFile file, String name) throws IOException { + long length = file.length(); + if (length % BLOCK_SIZE != 0) { + log.warn("Unable to load index of file {}: Invalid alignment", name); + return null; + } + if (length < 6 * BLOCK_SIZE) { + log.warn("Unable to load index of file {}: File too short", name); + return null; + } + if (length > Integer.MAX_VALUE) { + log.warn("Unable to load index of file {}: File too long", name); + return null; + } + ReaderAtEnd r = (whence, size) -> { + ByteBuffer buffer = ByteBuffer.allocate(size); + file.seek(length - 2 * BLOCK_SIZE - whence); + file.readFully(buffer.array()); + return buffer; + }; + try { + return indexLoader.loadIndex(r); + } catch (InvalidIndexException e) { + log.warn("Unable to load index of file {}: {}", name, e.getMessage()); + } + return null; + } + + @Override + public Map> getGraph() throws IOException { + ByteBuffer graph = loadGraph(); + if (graph == null) { + return null; + } else { + return GraphLoader.parseGraph(graph); + } + } + + @Override + public boolean hasGraph() { + if (hasGraph == null) { + try { + loadGraph(); + } catch (IOException ignore) { } + } + return hasGraph; + } + + private ByteBuffer loadGraph() throws IOException { + int end = access.length() - 2 * BLOCK_SIZE - getIndexEntrySize(); + ByteBuffer graph = GraphLoader.loadGraph((whence, amount) -> access.read(end - whence, amount)); + hasGraph = graph != null; + return graph; + } + + @Override + public BinaryReferencesIndex getBinaryReferences() throws IOException, InvalidBinaryReferencesIndexException { + int end = access.length() - 2 * BLOCK_SIZE - getIndexEntrySize() - getGraphEntrySize(); + return BinaryReferencesIndexLoader.loadBinaryReferencesIndex((whence, size) -> access.read(end - whence, size)); + } + + @Override + public long length() { + return file.length(); + } + + @Override + public String getName() { + return name; + } + + @Override + public void close() throws IOException { + access.close(); + } + + @Override + public int getEntrySize(int size) { + return BLOCK_SIZE + size + getPaddingSize(size); + } + + private int getIndexEntrySize() { + return getEntrySize(index.size()); + } + + private int getGraphEntrySize() { + ByteBuffer buffer; + + try { + buffer = loadGraph(); + } catch (IOException e) { + log.warn("Exception while loading pre-compiled tar graph", e); + return 0; + } + + if (buffer == null) { + return 0; + } + + return getEntrySize(buffer.getInt(buffer.limit() - 8)); + } + + +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarWriter.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarWriter.java new file mode 100644 index 0000000000..0a1846ab09 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarWriter.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.file.tar; + +import com.google.common.base.Stopwatch; +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.zip.CRC32; + +import static com.google.common.base.Charsets.UTF_8; +import static com.google.common.base.Preconditions.checkState; +import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.BLOCK_SIZE; + +public class SegmentTarWriter implements SegmentArchiveManager.SegmentArchiveWriter { + + private static final Logger log = LoggerFactory.getLogger(SegmentTarWriter.class); + + private static final byte[] ZERO_BYTES = new byte[BLOCK_SIZE]; + + private final FileStoreMonitor monitor; + + /** + * The file being written. This instance is also used as an additional + * synchronization point by {@link #flush()} and {@link #close()} to + * allow {@link #flush()} to work concurrently with normal reads and + * writes, but not with a concurrent {@link #close()}. + */ + private final File file; + + private final IOMonitor ioMonitor; + + /** + * File handle. Initialized lazily in {@link #writeSegment(long, long, byte[], int, int, GCGeneration)} + * to avoid creating an extra empty file when just reading from the repository. + * Should only be accessed from synchronized code. + */ + private RandomAccessFile access = null; + + private FileChannel channel = null; + + private volatile long length; + + public SegmentTarWriter(File file, FileStoreMonitor monitor, IOMonitor ioMonitor) { + this.file = file; + this.monitor = monitor; + this.ioMonitor = ioMonitor; + } + + @Override + public TarEntry writeSegment(long msb, long lsb, byte[] data, int offset, int size, GCGeneration generation) throws IOException { + UUID uuid = new UUID(msb, lsb); + CRC32 checksum = new CRC32(); + checksum.update(data, offset, size); + String entryName = String.format("%s.%08x", uuid, checksum.getValue()); + byte[] header = newEntryHeader(entryName, size); + + log.debug("Writing segment {} to {}", uuid, file); + + if (access == null) { + access = new RandomAccessFile(file, "rw"); + channel = access.getChannel(); + } + + int padding = getPaddingSize(size); + + long initialLength = access.getFilePointer(); + + access.write(header); + + long dataOffset = access.getFilePointer(); + + ioMonitor.beforeSegmentWrite(file, msb, lsb, size); + Stopwatch stopwatch = Stopwatch.createStarted(); + access.write(data, offset, size); + ioMonitor.afterSegmentWrite(file, msb, lsb, size, stopwatch.elapsed(TimeUnit.NANOSECONDS)); + + if (padding > 0) { + access.write(ZERO_BYTES, 0, padding); + } + + long currentLength = access.getFilePointer(); + monitor.written(currentLength - initialLength); + + length = currentLength; + + return new TarEntry(msb, lsb, (int) dataOffset, size, generation); + } + + @Override + public ByteBuffer readSegment(TarEntry tarEntry) throws IOException { + checkState(channel != null); // implied by entry != null + ByteBuffer data = ByteBuffer.allocate(tarEntry.size()); + channel.read(data, tarEntry.offset()); + data.rewind(); + return data; + } + + @Override + public void writeIndex(byte[] data) throws IOException { + byte[] header = newEntryHeader(file.getName() + ".idx", data.length); + access.write(header); + access.write(data); + monitor.written(header.length + data.length); + + length = access.getFilePointer(); + } + + @Override + public void writeGraph(byte[] data) throws IOException { + int paddingSize = getPaddingSize(data.length); + byte[] header = newEntryHeader(file.getName() + ".gph", data.length + paddingSize); + access.write(header); + if (paddingSize > 0) { + access.write(ZERO_BYTES, 0, paddingSize); + } + access.write(data); + monitor.written(header.length + paddingSize + data.length); + + length = access.getFilePointer(); + } + + @Override + public void writeBinaryReferences(byte[] data) throws IOException { + int paddingSize = getPaddingSize(data.length); + byte[] header = newEntryHeader(file.getName() + ".brf", data.length + paddingSize); + access.write(header); + if (paddingSize > 0) { + access.write(ZERO_BYTES, 0, paddingSize); + } + access.write(data); + monitor.written(header.length + paddingSize + data.length); + + length = access.getFilePointer(); + } + + @Override + public long getLength() { + return length; + } + + @Override + public void close() throws IOException { + access.write(ZERO_BYTES); + access.write(ZERO_BYTES); + access.close(); + + monitor.written(BLOCK_SIZE * 2); + } + + @Override + public boolean isCreated() { + return access != null; + } + + @Override + public void flush() throws IOException { + access.getFD().sync(); + } + + @Override + public String getName() { + return file.getName(); + } + + private static byte[] newEntryHeader(String name, int size) { + byte[] header = new byte[BLOCK_SIZE]; + + // File name + byte[] nameBytes = name.getBytes(UTF_8); + System.arraycopy( + nameBytes, 0, header, 0, Math.min(nameBytes.length, 100)); + + // File mode + System.arraycopy( + String.format("%07o", 0400).getBytes(UTF_8), 0, + header, 100, 7); + + // User's numeric user ID + System.arraycopy( + String.format("%07o", 0).getBytes(UTF_8), 0, + header, 108, 7); + + // Group's numeric user ID + System.arraycopy( + String.format("%07o", 0).getBytes(UTF_8), 0, + header, 116, 7); + + // File size in bytes (octal basis) + System.arraycopy( + String.format("%011o", size).getBytes(UTF_8), 0, + header, 124, 11); + + // Last modification time in numeric Unix time format (octal) + long time = System.currentTimeMillis() / 1000; + System.arraycopy( + String.format("%011o", time).getBytes(UTF_8), 0, + header, 136, 11); + + // Checksum for header record + System.arraycopy( + new byte[] {' ', ' ', ' ', ' ', ' ', ' ', ' ', ' '}, 0, + header, 148, 8); + + // Type flag + header[156] = '0'; + + // Compute checksum + int checksum = 0; + for (byte aHeader : header) { + checksum += aHeader & 0xff; + } + System.arraycopy( + String.format("%06o\0 ", checksum).getBytes(UTF_8), 0, + header, 148, 8); + + return header; + } + + static int getPaddingSize(int size) { + int remainder = size % BLOCK_SIZE; + if (remainder > 0) { + return BLOCK_SIZE - remainder; + } else { + return 0; + } + } +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarConstants.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarConstants.java index f81da2259b..55e4087ce7 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarConstants.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarConstants.java @@ -17,7 +17,7 @@ package org.apache.jackrabbit.oak.segment.file.tar; -class TarConstants { +public class TarConstants { private TarConstants() { // Prevent instantiation. @@ -42,11 +42,11 @@ class TarConstants { * (size, checksum, the number of UUIDs). * */ - static final int GRAPH_MAGIC = ('\n' << 24) + ('0' << 16) + ('G' << 8) + '\n'; + public static final int GRAPH_MAGIC = ('\n' << 24) + ('0' << 16) + ('G' << 8) + '\n'; /** * The tar file block size. */ - static final int BLOCK_SIZE = 512; + public static final int BLOCK_SIZE = 512; } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarEntry.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarEntry.java index b9daeb1427..efcb158f97 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarEntry.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarEntry.java @@ -24,7 +24,7 @@ import java.util.Comparator; * A file entry location in a tar file. This is used for the index with a tar * file. */ -class TarEntry { +public class TarEntry { /** Size in bytes a tar entry takes up in the tar file */ static final int SIZE = 33; @@ -52,7 +52,7 @@ class TarEntry { private final GCGeneration generation; - TarEntry(long msb, long lsb, int offset, int size, GCGeneration generation) { + public TarEntry(long msb, long lsb, int offset, int size, GCGeneration generation) { this.msb = msb; this.lsb = lsb; this.offset = offset; @@ -60,19 +60,19 @@ class TarEntry { this.generation = generation; } - long msb() { + public long msb() { return msb; } - long lsb() { + public long lsb() { return lsb; } - int offset() { + public int offset() { return offset; } - int size() { + public int size() { return size; } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarFiles.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarFiles.java index 0ddfa3de33..1f0be25e35 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarFiles.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarFiles.java @@ -24,7 +24,6 @@ import static com.google.common.collect.Lists.newArrayList; import static com.google.common.collect.Maps.newHashMap; import static com.google.common.collect.Sets.newHashSet; import static java.util.Collections.emptySet; -import static org.apache.commons.io.FileUtils.listFiles; import java.io.Closeable; import java.io.File; @@ -52,6 +51,9 @@ import javax.annotation.Nonnull; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.file.FileReaper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,7 +78,7 @@ public class TarFiles implements Closeable { private long reclaimedSize; - private List removableFiles; + private List removableFiles; private Set reclaimedSegmentIds; @@ -88,7 +90,7 @@ public class TarFiles implements Closeable { return reclaimedSize; } - public List getRemovableFiles() { + public List getRemovableFiles() { return removableFiles; } @@ -118,6 +120,8 @@ public class TarFiles implements Closeable { private boolean readOnly; + private SegmentNodeStorePersistence persistence; + private Builder() { // Prevent external instantiation. } @@ -158,15 +162,54 @@ public class TarFiles implements Closeable { return this; } + public Builder withPersistence(SegmentNodeStorePersistence persistence) { + this.persistence = persistence; + return this; + } + public TarFiles build() throws IOException { checkState(directory != null, "Directory not specified"); checkState(tarRecovery != null, "TAR recovery strategy not specified"); checkState(ioMonitor != null, "I/O monitor not specified"); checkState(readOnly || fileStoreMonitor != null, "File store statistics not specified"); checkState(readOnly || maxFileSize != 0, "Max file size not specified"); + if (persistence == null) { + persistence = new TarPersistence(directory); + } return new TarFiles(this); } + public File getDirectory() { + return directory; + } + + public boolean isMemoryMapping() { + return memoryMapping; + } + + public TarRecovery getTarRecovery() { + return tarRecovery; + } + + public IOMonitor getIoMonitor() { + return ioMonitor; + } + + public FileStoreMonitor getFileStoreMonitor() { + return fileStoreMonitor; + } + + public long getMaxFileSize() { + return maxFileSize; + } + + public boolean isReadOnly() { + return readOnly; + } + + private SegmentArchiveManager buildArchiveManager() throws IOException { + return persistence.createArchiveManager(memoryMapping, ioMonitor, fileStoreMonitor); + } } private static final Logger log = LoggerFactory.getLogger(TarFiles.class); @@ -218,13 +261,13 @@ public class TarFiles implements Closeable { }; } - private static Map> collectFiles(File directory) { - Map> dataFiles = newHashMap(); - for (File file : listFiles(directory, null, false)) { - Matcher matcher = FILE_NAME_PATTERN.matcher(file.getName()); + private static Map> collectFiles(SegmentArchiveManager archiveManager) throws IOException { + Map> dataFiles = newHashMap(); + for (String file : archiveManager.listArchives()) { + Matcher matcher = FILE_NAME_PATTERN.matcher(file); if (matcher.matches()) { Integer index = Integer.parseInt(matcher.group(2)); - Map files = dataFiles.get(index); + Map files = dataFiles.get(index); if (files == null) { files = newHashMap(); dataFiles.put(index, files); @@ -245,9 +288,7 @@ public class TarFiles implements Closeable { private final long maxFileSize; - private final boolean memoryMapping; - - private final IOMonitor ioMonitor; + private SegmentArchiveManager archiveManager; /** * Guards access to the {@link #readers} and {@link #writer} references. @@ -281,9 +322,9 @@ public class TarFiles implements Closeable { private TarFiles(Builder builder) throws IOException { maxFileSize = builder.maxFileSize; - memoryMapping = builder.memoryMapping; - ioMonitor = builder.ioMonitor; - Map> map = collectFiles(builder.directory); + archiveManager = builder.buildArchiveManager(); + + Map> map = collectFiles(archiveManager); Integer[] indices = map.keySet().toArray(new Integer[map.size()]); Arrays.sort(indices); @@ -295,9 +336,9 @@ public class TarFiles implements Closeable { for (Integer index : indices) { TarReader r; if (builder.readOnly) { - r = TarReader.openRO(map.get(index), memoryMapping, builder.tarRecovery, ioMonitor); + r = TarReader.openRO(map.get(index), builder.tarRecovery, archiveManager); } else { - r = TarReader.open(map.get(index), memoryMapping, builder.tarRecovery, ioMonitor); + r = TarReader.open(map.get(index), builder.tarRecovery, archiveManager); } readers = new Node(r, readers); } @@ -308,10 +349,9 @@ public class TarFiles implements Closeable { if (indices.length > 0) { writeNumber = indices[indices.length - 1] + 1; } - writer = new TarWriter(builder.directory, builder.fileStoreMonitor, writeNumber, builder.ioMonitor); + writer = new TarWriter(archiveManager, writeNumber); } - @Override public void close() throws IOException { shutdown = true; @@ -510,7 +550,7 @@ public class TarFiles implements Closeable { if (newWriter == writer) { return; } - readers = new Node(TarReader.open(writer.getFile(), memoryMapping, ioMonitor), readers); + readers = new Node(TarReader.open(writer.getFileName(), archiveManager), readers); writer = newWriter; } @@ -657,7 +697,7 @@ public class TarFiles implements Closeable { } catch (IOException e) { log.warn("Unable to close swept TAR reader", e); } - result.removableFiles.add(closeable.getFile()); + result.removableFiles.add(closeable.getFileName()); } return result; @@ -711,7 +751,7 @@ public class TarFiles implements Closeable { Map> graph = null; for (TarReader reader : iterable(head)) { - if (fileName.equals(reader.getFile().getName())) { + if (fileName.equals(reader.getFileName())) { index = reader.getUUIDs(); graph = reader.getGraph(); break; @@ -744,9 +784,12 @@ public class TarFiles implements Closeable { Map> index = new HashMap<>(); for (TarReader reader : iterable(head)) { - index.put(reader.getFile().getName(), reader.getUUIDs()); + index.put(reader.getFileName(), reader.getUUIDs()); } return index; } + public FileReaper createFileReaper() { + return new FileReaper(archiveManager); + } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarPersistence.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarPersistence.java new file mode 100644 index 0000000000..dac7662cf4 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarPersistence.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.file.tar; + +import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.file.LocalGCJournalFile; +import org.apache.jackrabbit.oak.segment.file.LocalManifestFile; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileLock; +import java.nio.channels.OverlappingFileLockException; +import java.util.Collection; + +public class TarPersistence implements SegmentNodeStorePersistence { + + private final File directory; + + public TarPersistence(File directory) { + this.directory = directory; + } + + @Override + public SegmentArchiveManager createArchiveManager(boolean memoryMapping, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) { + return new SegmentTarManager(directory, fileStoreMonitor, ioMonitor, memoryMapping); + } + + @Override + public boolean segmentFilesExist() { + Collection entries = FileUtils.listFiles(directory, new String[] {"tar"}, false); + return !entries.isEmpty(); + } + + @Override + public JournalFile getJournalFile(String journalFile) { + return new LocalJournalFile(directory, journalFile); + } + + @Override + public GCJournalFile getGCJournalFile(String gcJournalFile) { + return new LocalGCJournalFile(directory, gcJournalFile); + } + + @Override + public ManifestFile getManifestFile(String manifestFile) { + return new LocalManifestFile(directory, manifestFile); + } + + @Override + public RepositoryLock lockRepository(String lockName) throws IOException { + RandomAccessFile lockFile = new RandomAccessFile(new File(directory, lockName), "rw"); + try { + FileLock lock = lockFile.getChannel().lock(); + return () -> { + lock.release(); + lockFile.close(); + }; + } catch (OverlappingFileLockException ex) { + throw new IllegalStateException(directory.getAbsolutePath() + " is in use by another store.", ex); + } + } + +} \ No newline at end of file diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarReader.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarReader.java index 5c321d6658..3bdc551368 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarReader.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarReader.java @@ -19,24 +19,16 @@ package org.apache.jackrabbit.oak.segment.file.tar; -import static com.google.common.base.Charsets.UTF_8; import static com.google.common.collect.Lists.newArrayList; -import static com.google.common.collect.Lists.newArrayListWithCapacity; -import static com.google.common.collect.Maps.newHashMapWithExpectedSize; import static com.google.common.collect.Maps.newLinkedHashMap; import static com.google.common.collect.Maps.newTreeMap; import static com.google.common.collect.Sets.newHashSet; -import static java.nio.ByteBuffer.wrap; import static java.util.Collections.singletonList; import static org.apache.jackrabbit.oak.segment.file.tar.GCGeneration.newGCGeneration; -import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.BLOCK_SIZE; -import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.GRAPH_MAGIC; -import static org.apache.jackrabbit.oak.segment.file.tar.index.IndexLoader.newIndexLoader; import java.io.Closeable; import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; @@ -47,48 +39,25 @@ import java.util.Map.Entry; import java.util.Set; import java.util.SortedMap; import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.zip.CRC32; import javax.annotation.Nonnull; import com.google.common.base.Predicate; -import com.google.common.base.Stopwatch; -import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager; import org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndex; -import org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndexLoader; import org.apache.jackrabbit.oak.segment.file.tar.binaries.InvalidBinaryReferencesIndexException; import org.apache.jackrabbit.oak.segment.file.tar.index.Index; import org.apache.jackrabbit.oak.segment.file.tar.index.IndexEntry; -import org.apache.jackrabbit.oak.segment.file.tar.index.IndexLoader; -import org.apache.jackrabbit.oak.segment.file.tar.index.InvalidIndexException; -import org.apache.jackrabbit.oak.segment.util.ReaderAtEnd; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class TarReader implements Closeable { +public class TarReader implements Closeable { private static final Logger log = LoggerFactory.getLogger(TarReader.class); - private static final IndexLoader indexLoader = newIndexLoader(BLOCK_SIZE); - - /** - * Pattern of the segment entry names. Note the trailing (\\..*)? group - * that's included for compatibility with possible future extensions. - */ - private static final Pattern NAME_PATTERN = Pattern.compile( - "([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})" - + "(\\.([0-9a-f]{8}))?(\\..*)?"); - - private static int getEntrySize(int size) { - return BLOCK_SIZE + size + TarWriter.getPaddingSize(size); - } - - static TarReader open(File file, boolean memoryMapping, IOMonitor ioMonitor) throws IOException { - TarReader reader = openFirstFileWithValidIndex(singletonList(file), memoryMapping, ioMonitor); + static TarReader open(String file, SegmentArchiveManager archiveManager) throws IOException { + TarReader reader = openFirstFileWithValidIndex(singletonList(file), archiveManager); if (reader != null) { return reader; } else { @@ -107,21 +76,17 @@ class TarReader implements Closeable { * generations. * * @param files The generations of the same TAR file. - * @param memoryMapping If {@code true}, opens the TAR file with memory - * mapping enabled. * @param recovery Strategy for recovering a damaged TAR file. - * @param ioMonitor Callbacks to track internal operations for the open - * TAR file. * @return An instance of {@link TarReader}. */ - static TarReader open(Map files, boolean memoryMapping, TarRecovery recovery, IOMonitor ioMonitor) throws IOException { - SortedMap sorted = newTreeMap(); + static TarReader open(Map files, TarRecovery recovery, SegmentArchiveManager archiveManager) throws IOException { + SortedMap sorted = newTreeMap(); sorted.putAll(files); - List list = newArrayList(sorted.values()); + List list = newArrayList(sorted.values()); Collections.reverse(list); - TarReader reader = openFirstFileWithValidIndex(list, memoryMapping, ioMonitor); + TarReader reader = openFirstFileWithValidIndex(list, archiveManager); if (reader != null) { return reader; } @@ -129,15 +94,15 @@ class TarReader implements Closeable { // no generation has a valid index, so recover as much as we can log.warn("Could not find a valid tar index in {}, recovering...", list); LinkedHashMap entries = newLinkedHashMap(); - for (File file : sorted.values()) { - collectFileEntries(file, entries, true); + for (String file : sorted.values()) { + collectFileEntries(file, entries, true, archiveManager); } // regenerate the first generation based on the recovered data - File file = sorted.values().iterator().next(); - generateTarFile(entries, file, recovery, ioMonitor); + String file = sorted.values().iterator().next(); + generateTarFile(entries, file, recovery, archiveManager); - reader = openFirstFileWithValidIndex(singletonList(file), memoryMapping, ioMonitor); + reader = openFirstFileWithValidIndex(singletonList(file), archiveManager); if (reader != null) { return reader; } else { @@ -145,11 +110,11 @@ class TarReader implements Closeable { } } - static TarReader openRO(Map files, boolean memoryMapping, TarRecovery recovery, IOMonitor ioMonitor) throws IOException { + static TarReader openRO(Map files, TarRecovery recovery, SegmentArchiveManager archiveManager) throws IOException { // for readonly store only try the latest generation of a given // tar file to prevent any rollback or rewrite - File file = files.get(Collections.max(files.keySet())); - TarReader reader = openFirstFileWithValidIndex(singletonList(file), memoryMapping, ioMonitor); + String file = files.get(Collections.max(files.keySet())); + TarReader reader = openFirstFileWithValidIndex(singletonList(file), archiveManager); if (reader != null) { return reader; } @@ -157,10 +122,10 @@ class TarReader implements Closeable { // collecting the entries (without touching the original file) and // writing them into an artificial tar file '.ro.bak' LinkedHashMap entries = newLinkedHashMap(); - collectFileEntries(file, entries, false); - file = findAvailGen(file, ".ro.bak"); - generateTarFile(entries, file, recovery, ioMonitor); - reader = openFirstFileWithValidIndex(singletonList(file), memoryMapping, ioMonitor); + collectFileEntries(file, entries, false, archiveManager); + file = findAvailGen(file, ".ro.bak", archiveManager); + generateTarFile(entries, file, recovery, archiveManager); + reader = openFirstFileWithValidIndex(singletonList(file), archiveManager); if (reader != null) { return reader; } @@ -176,21 +141,16 @@ class TarReader implements Closeable { * into. * @param backup If {@code true}, performs a backup of the TAR file. */ - private static void collectFileEntries(File file, LinkedHashMap entries, boolean backup) throws IOException { + private static void collectFileEntries(String file, LinkedHashMap entries, boolean backup, SegmentArchiveManager archiveManager) throws IOException { log.info("Recovering segments from tar file {}", file); try { - RandomAccessFile access = new RandomAccessFile(file, "r"); - try { - recoverEntries(file, access, entries); - } finally { - access.close(); - } + archiveManager.recoverEntries(file, entries); } catch (IOException e) { log.warn("Could not read tar file {}, skipping...", file, e); } if (backup) { - backupSafely(file); + backupSafely(archiveManager, file); } } @@ -202,12 +162,11 @@ class TarReader implements Closeable { * @param file The output file that will contain the recovered * entries. * @param recovery The recovery strategy to execute. - * @param ioMonitor An instance of {@link IOMonitor}. */ - private static void generateTarFile(LinkedHashMap entries, File file, TarRecovery recovery, IOMonitor ioMonitor) throws IOException { + private static void generateTarFile(LinkedHashMap entries, String file, TarRecovery recovery, SegmentArchiveManager archiveManager) throws IOException { log.info("Regenerating tar file {}", file); - try (TarWriter writer = new TarWriter(file, ioMonitor)) { + try (TarWriter writer = new TarWriter(archiveManager, file)) { for (Entry entry : entries.entrySet()) { try { recovery.recoverEntry(entry.getKey(), entry.getValue(), new EntryRecovery() { @@ -242,13 +201,13 @@ class TarReader implements Closeable { * * @param file File to backup. */ - private static void backupSafely(File file) throws IOException { - File backup = findAvailGen(file, ".bak"); - log.info("Backing up {} to {}", file, backup.getName()); - if (!file.renameTo(backup)) { + private static void backupSafely(SegmentArchiveManager archiveManager, String file) throws IOException { + String backup = findAvailGen(file, ".bak", archiveManager); + log.info("Backing up {} to {}", file, backup); + if (!archiveManager.renameTo(file, backup)) { log.warn("Renaming failed, so using copy to backup {}", file); - FileUtils.copyFile(file, backup); - if (!file.delete()) { + archiveManager.copyFile(file, backup); + if (!archiveManager.delete(file)) { throw new IOException( "Could not remove broken tar file " + file); } @@ -259,62 +218,29 @@ class TarReader implements Closeable { * Fine next available generation number so that a generated file doesn't * overwrite another existing file. * - * @param file The file to backup. + * @param name The file to backup. * @param ext The extension of the backed up file. */ - private static File findAvailGen(File file, String ext) { - File parent = file.getParentFile(); - String name = file.getName(); - File backup = new File(parent, name + ext); - for (int i = 2; backup.exists(); i++) { - backup = new File(parent, name + "." + i + ext); + private static String findAvailGen(String name, String ext, SegmentArchiveManager archiveManager) { + String backup = name + ext; + for (int i = 2; archiveManager.exists(backup); i++) { + backup = name + "." + i + ext; } return backup; } - private static TarReader openFirstFileWithValidIndex(List files, boolean memoryMapping, IOMonitor ioMonitor) { - for (File file : files) { - String name = file.getName(); + private static TarReader openFirstFileWithValidIndex(List archives, SegmentArchiveManager archiveManager) { + for (String name : archives) { try { - RandomAccessFile access = new RandomAccessFile(file, "r"); - try { - Index index = loadAndValidateIndex(access, name); - if (index == null) { - log.info("No index found in tar file {}, skipping...", name); - } else { - // found a file with a valid index, drop the others - for (File other : files) { - if (other != file) { - log.info("Removing unused tar file {}", other.getName()); - other.delete(); - } - } - - if (memoryMapping) { - try { - FileAccess mapped = new FileAccess.Mapped(access); - return new TarReader(file, mapped, index, ioMonitor); - } catch (IOException e) { - log.warn("Failed to mmap tar file {}. Falling back to normal file " + - "IO, which will negatively impact repository performance. " + - "This problem may have been caused by restrictions on the " + - "amount of virtual memory available to the JVM. Please make " + - "sure that a 64-bit JVM is being used and that the process " + - "has access to unlimited virtual memory (ulimit option -v).", - name, e); - } + SegmentArchiveManager.SegmentArchiveReader reader = archiveManager.open(name); + if (reader != null) { + for (String other : archives) { + if (other != name) { + log.info("Removing unused tar file {}", other); + archiveManager.delete(other); } - - FileAccess random = new FileAccess.Random(access); - // prevent the finally block from closing the file - // as the returned TarReader will take care of that - access = null; - return new TarReader(file, random, index, ioMonitor); - } - } finally { - if (access != null) { - access.close(); } + return new TarReader(archiveManager, reader); } } catch (IOException e) { log.warn("Could not read tar file {}, skipping...", name, e); @@ -324,160 +250,22 @@ class TarReader implements Closeable { return null; } - /** - * Tries to read an existing index from the given tar file. The index is - * returned if it is found and looks valid (correct checksum, passes sanity - * checks). - * - * @param file The TAR file. - * @param name Name of the TAR file, for logging purposes. - * @return An instance of {@link ByteBuffer} populated with the content of - * the index. If the TAR doesn't contain any index, {@code null} is returned - * instead. - */ - private static Index loadAndValidateIndex(RandomAccessFile file, String name) throws IOException { - long length = file.length(); - - if (length % BLOCK_SIZE != 0) { - log.warn("Unable to load index of file {}: Invalid alignment", name); - return null; - } - if (length < 6 * BLOCK_SIZE) { - log.warn("Unable to load index of file {}: File too short", name); - return null; - } - if (length > Integer.MAX_VALUE) { - log.warn("Unable to load index of file {}: File too long", name); - return null; - } - - ReaderAtEnd r = (whence, size) -> { - ByteBuffer buffer = ByteBuffer.allocate(size); - file.seek(length - 2 * BLOCK_SIZE - whence); - file.readFully(buffer.array()); - return buffer; - }; - - try { - return indexLoader.loadIndex(r); - } catch (InvalidIndexException e) { - log.warn("Unable to load index of file {}: {}", name, e.getMessage()); - } - - return null; - } - - /** - * Scans through the tar file, looking for all segment entries. - * - * @param file The path of the TAR file. - * @param access The contents of the TAR file. - * @param entries The map that will contain the recovered entries. The - * entries are inserted in the {@link LinkedHashMap} in the - * order they appear in the TAR file. - */ - private static void recoverEntries(File file, RandomAccessFile access, LinkedHashMap entries) throws IOException { - byte[] header = new byte[BLOCK_SIZE]; - while (access.getFilePointer() + BLOCK_SIZE <= access.length()) { - // read the tar header block - access.readFully(header); - - // compute the header checksum - int sum = 0; - for (int i = 0; i < BLOCK_SIZE; i++) { - sum += header[i] & 0xff; - } - - // identify possible zero block - if (sum == 0 && access.getFilePointer() + 2 * BLOCK_SIZE == access.length()) { - return; // found the zero blocks at the end of the file - } - - // replace the actual stored checksum with spaces for comparison - for (int i = 148; i < 148 + 8; i++) { - sum -= header[i] & 0xff; - sum += ' '; - } - - byte[] checkbytes = String.format("%06o\0 ", sum).getBytes(UTF_8); - for (int i = 0; i < checkbytes.length; i++) { - if (checkbytes[i] != header[148 + i]) { - log.warn("Invalid entry checksum at offset {} in tar file {}, skipping...", - access.getFilePointer() - BLOCK_SIZE, file); - } - } - - // The header checksum passes, so read the entry name and size - ByteBuffer buffer = wrap(header); - String name = readString(buffer, 100); - buffer.position(124); - int size = readNumber(buffer, 12); - if (access.getFilePointer() + size > access.length()) { - // checksum was correct, so the size field should be accurate - log.warn("Partial entry {} in tar file {}, ignoring...", name, file); - return; - } - - Matcher matcher = NAME_PATTERN.matcher(name); - if (matcher.matches()) { - UUID id = UUID.fromString(matcher.group(1)); - - String checksum = matcher.group(3); - if (checksum != null || !entries.containsKey(id)) { - byte[] data = new byte[size]; - access.readFully(data); - - // skip possible padding to stay at block boundaries - long position = access.getFilePointer(); - long remainder = position % BLOCK_SIZE; - if (remainder != 0) { - access.seek(position + (BLOCK_SIZE - remainder)); - } - - if (checksum != null) { - CRC32 crc = new CRC32(); - crc.update(data); - if (crc.getValue() != Long.parseLong(checksum, 16)) { - log.warn("Checksum mismatch in entry {} of tar file {}, skipping...", - name, file); - continue; - } - } - - entries.put(id, data); - } - } else if (!name.equals(file.getName() + ".idx")) { - log.warn("Unexpected entry {} in tar file {}, skipping...", - name, file); - long position = access.getFilePointer() + size; - long remainder = position % BLOCK_SIZE; - if (remainder != 0) { - position += BLOCK_SIZE - remainder; - } - access.seek(position); - } - } - } - - private final File file; + private final SegmentArchiveManager archiveManager; - private final FileAccess access; + private final SegmentArchiveManager.SegmentArchiveReader archive; private final Index index; private volatile boolean hasGraph; - private final IOMonitor ioMonitor; - - private TarReader(File file, FileAccess access, Index index, IOMonitor ioMonitor) { - this.file = file; - this.access = access; - this.index = index; - this.ioMonitor = ioMonitor; + private TarReader(SegmentArchiveManager archiveManager, SegmentArchiveManager.SegmentArchiveReader archive) { + this.archiveManager = archiveManager; + this.archive = archive; + this.index = archive.getIndex(); } long size() { - return file.length(); + return archive.length(); } /** @@ -514,15 +302,7 @@ class TarReader implements Closeable { * @return the byte buffer, or null if not in this file. */ ByteBuffer readEntry(long msb, long lsb) throws IOException { - int idx = findEntry(msb, lsb); - if (idx == -1) { - return null; - } - return readEntry(msb, lsb, index.entry(idx)); - } - - private ByteBuffer readEntry(long msb, long lsb, IndexEntry entry) throws IOException { - return readSegment(msb, lsb, entry.getPosition(), entry.getLength()); + return archive.readSegment(msb, lsb); } /** @@ -705,7 +485,7 @@ class TarReader implements Closeable { * TarReader}, or {@code null}. */ TarReader sweep(@Nonnull Set reclaim, @Nonnull Set reclaimed) throws IOException { - String name = file.getName(); + String name = archive.getName(); log.debug("Cleaning up {}", name); Set cleaned = newHashSet(); @@ -716,13 +496,13 @@ class TarReader implements Closeable { TarEntry[] entries = getEntries(); for (int i = 0; i < entries.length; i++) { TarEntry entry = entries[i]; - beforeSize += getEntrySize(entry.size()); + beforeSize += archive.getEntrySize(entry.size()); UUID id = new UUID(entry.msb(), entry.lsb()); if (reclaim.contains(id)) { cleaned.add(id); entries[i] = null; } else { - afterSize += getEntrySize(entry.size()); + afterSize += archive.getEntrySize(entry.size()); afterCount += 1; } } @@ -737,7 +517,7 @@ class TarReader implements Closeable { // in which case we'll always generate a new tar file with // the graph to speed up future garbage collection runs. log.debug("Not enough space savings. ({}/{}). Skipping clean up of {}", - access.length() - afterSize, access.length(), name); + archive.length() - afterSize, archive.length(), name); return this; } if (!hasGraph()) { @@ -751,21 +531,18 @@ class TarReader implements Closeable { return this; } - File newFile = new File( - file.getParentFile(), - name.substring(0, pos) + (char) (generation + 1) + ".tar"); + String newFile = name.substring(0, pos) + (char) (generation + 1) + ".tar"; - log.debug("Writing new generation {}", newFile.getName()); - TarWriter writer = new TarWriter(newFile, ioMonitor); + log.debug("Writing new generation {}", newFile); + TarWriter writer = new TarWriter(archiveManager, newFile); for (TarEntry entry : entries) { if (entry != null) { long msb = entry.msb(); long lsb = entry.lsb(); - int offset = entry.offset(); int size = entry.size(); GCGeneration gen = entry.generation(); byte[] data = new byte[size]; - readSegment(msb, lsb, offset, size).get(data); + archive.readSegment(msb, lsb).get(data); writer.writeEntry(msb, lsb, data, 0, size, gen); } } @@ -809,19 +586,19 @@ class TarReader implements Closeable { writer.close(); - TarReader reader = openFirstFileWithValidIndex(singletonList(newFile), access.isMemoryMapped(), ioMonitor); + TarReader reader = openFirstFileWithValidIndex(singletonList(newFile), archiveManager); if (reader != null) { reclaimed.addAll(cleaned); return reader; } else { - log.warn("Failed to open cleaned up tar file {}", file); + log.warn("Failed to open cleaned up tar file {}", getFileName()); return this; } } @Override public void close() throws IOException { - access.close(); + archive.close(); } /** @@ -831,42 +608,11 @@ class TarReader implements Closeable { * @return The parsed graph, or {@code null} if one was not found. */ Map> getGraph() throws IOException { - ByteBuffer graph = loadGraph(); - if (graph == null) { - return null; - } else { - return parseGraph(graph); - } + return archive.getGraph(); } private boolean hasGraph() { - if (!hasGraph) { - try { - loadGraph(); - } catch (IOException ignore) { } - } - return hasGraph; - } - - private int getIndexEntrySize() { - return getEntrySize(index.size()); - } - - private int getGraphEntrySize() { - ByteBuffer buffer; - - try { - buffer = loadGraph(); - } catch (IOException e) { - log.warn("Exception while loading pre-compiled tar graph", e); - return 0; - } - - if (buffer == null) { - return 0; - } - - return getEntrySize(buffer.getInt(buffer.limit() - 8)); + return archive.hasGraph(); } /** @@ -883,142 +629,27 @@ class TarReader implements Closeable { BinaryReferencesIndex getBinaryReferences() { BinaryReferencesIndex index = null; try { - index = loadBinaryReferences(); + index = archive.getBinaryReferences(); } catch (InvalidBinaryReferencesIndexException | IOException e) { log.warn("Exception while loading binary reference", e); } return index; } - private BinaryReferencesIndex loadBinaryReferences() throws IOException, InvalidBinaryReferencesIndexException { - int end = access.length() - 2 * BLOCK_SIZE - getIndexEntrySize() - getGraphEntrySize(); - return BinaryReferencesIndexLoader.loadBinaryReferencesIndex((whence, size) -> access.read(end - whence, size)); - } - - /** - * Loads the optional pre-compiled graph entry from the given tar file. - * - * @return graph buffer, or {@code null} if one was not found - * @throws IOException if the tar file could not be read - */ - private ByteBuffer loadGraph() throws IOException { - int pos = access.length() - 2 * BLOCK_SIZE - getIndexEntrySize(); - - ByteBuffer meta = access.read(pos - 16, 16); - - int crc32 = meta.getInt(); - int count = meta.getInt(); - int bytes = meta.getInt(); - int magic = meta.getInt(); - - if (magic != GRAPH_MAGIC) { - log.warn("Invalid graph magic number in {}", file); - return null; - } - - if (count < 0) { - log.warn("Invalid number of entries in {}", file); - return null; - } - - if (bytes < 4 + count * 34) { - log.warn("Invalid entry size in {}", file); - return null; - } - - ByteBuffer graph = access.read(pos - bytes, bytes); - - byte[] b = new byte[bytes - 16]; - - graph.mark(); - graph.get(b); - graph.reset(); - - CRC32 checksum = new CRC32(); - checksum.update(b); - - if (crc32 != (int) checksum.getValue()) { - log.warn("Invalid graph checksum in tar file {}", file); - return null; - } - - hasGraph = true; - - return graph; - } - - private ByteBuffer readSegment(long msb, long lsb, int offset, int size) throws IOException { - ioMonitor.beforeSegmentRead(file, msb, lsb, size); - Stopwatch stopwatch = Stopwatch.createStarted(); - ByteBuffer buffer = access.read(offset, size); - long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); - ioMonitor.afterSegmentRead(file, msb, lsb, size, elapsed); - return buffer; - } - - private static Map> parseGraph(ByteBuffer buffer) { - int nEntries = buffer.getInt(buffer.limit() - 12); - - Map> graph = newHashMapWithExpectedSize(nEntries); - - for (int i = 0; i < nEntries; i++) { - long msb = buffer.getLong(); - long lsb = buffer.getLong(); - int nVertices = buffer.getInt(); - - List vertices = newArrayListWithCapacity(nVertices); - - for (int j = 0; j < nVertices; j++) { - long vMsb = buffer.getLong(); - long vLsb = buffer.getLong(); - vertices.add(new UUID(vMsb, vLsb)); - } - - graph.put(new UUID(msb, lsb), vertices); - } - - return graph; - } - - private static String readString(ByteBuffer buffer, int fieldSize) { - byte[] b = new byte[fieldSize]; - buffer.get(b); - int n = 0; - while (n < fieldSize && b[n] != 0) { - n++; - } - return new String(b, 0, n, UTF_8); - } - - private static int readNumber(ByteBuffer buffer, int fieldSize) { - byte[] b = new byte[fieldSize]; - buffer.get(b); - int number = 0; - for (int i = 0; i < fieldSize; i++) { - int digit = b[i] & 0xff; - if ('0' <= digit && digit <= '7') { - number = number * 8 + digit - '0'; - } else { - break; - } - } - return number; - } - /** * Return the path of this TAR file. * * @return An instance of {@link File}. */ - File getFile() { - return file; + String getFileName() { + return archive.getName(); } //------------------------------------------------------------< Object >-- @Override public String toString() { - return file.toString(); + return getFileName(); } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarWriter.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarWriter.java index 14e4f76a0b..b81f715fa5 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarWriter.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarWriter.java @@ -19,7 +19,6 @@ package org.apache.jackrabbit.oak.segment.file.tar; -import static com.google.common.base.Charsets.UTF_8; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkPositionIndexes; import static com.google.common.base.Preconditions.checkState; @@ -33,20 +32,15 @@ import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.GRAPH_MAGI import static org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndexWriter.newBinaryReferencesIndexWriter; import java.io.Closeable; -import java.io.File; -import java.io.FileDescriptor; import java.io.IOException; -import java.io.RandomAccessFile; import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.zip.CRC32; -import com.google.common.base.Stopwatch; +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager; import org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndexWriter; import org.apache.jackrabbit.oak.segment.file.tar.index.IndexWriter; import org.slf4j.Logger; @@ -61,39 +55,8 @@ class TarWriter implements Closeable { /** Logger instance */ private static final Logger log = LoggerFactory.getLogger(TarWriter.class); - private static final byte[] ZERO_BYTES = new byte[BLOCK_SIZE]; - - static int getPaddingSize(int size) { - int remainder = size % BLOCK_SIZE; - if (remainder > 0) { - return BLOCK_SIZE - remainder; - } else { - return 0; - } - } - private final int writeIndex; - /** - * The file being written. This instance is also used as an additional - * synchronization point by {@link #flush()} and {@link #close()} to - * allow {@link #flush()} to work concurrently with normal reads and - * writes, but not with a concurrent {@link #close()}. - */ - private final File file; - - private final FileStoreMonitor monitor; - - /** - * File handle. Initialized lazily in {@link #writeEntry(UUID, byte[], - * byte[], int, int, GCGeneration)} to avoid creating an extra empty file - * when just reading from the repository. Should only be accessed from - * synchronized code. - */ - private RandomAccessFile access = null; - - private FileChannel channel = null; - /** * Flag to indicate a closed writer. Accessing a closed writer is illegal. * Should only be accessed from synchronized code. @@ -121,24 +84,30 @@ class TarWriter implements Closeable { */ private final Map> graph = newHashMap(); - private final IOMonitor ioMonitor; + private final SegmentArchiveManager archiveManager; + + private final SegmentArchiveManager.SegmentArchiveWriter archive; + + /** This object is used as an additional + * synchronization point by {@link #flush()} and {@link #close()} to + * allow {@link #flush()} to work concurrently with normal reads and + * writes, but not with a concurrent {@link #close()}. */ + private final Object closeMonitor = new Object(); /** * Used for maintenance operations (GC or recovery) via the TarReader and * tests */ - TarWriter(File file, IOMonitor ioMonitor) { - this.file = file; - this.monitor = new FileStoreMonitorAdapter(); + TarWriter(SegmentArchiveManager archiveManager, String archiveName) { + this.archiveManager = archiveManager; + this.archive = archiveManager.create(archiveName); this.writeIndex = -1; - this.ioMonitor = ioMonitor; } - TarWriter(File directory, FileStoreMonitor monitor, int writeIndex, IOMonitor ioMonitor) { - this.file = new File(directory, format(FILE_NAME_FORMAT, writeIndex, "a")); - this.monitor = monitor; + TarWriter(SegmentArchiveManager archiveManager, int writeIndex) { + this.archiveManager = archiveManager; + this.archive = archiveManager.create(format(FILE_NAME_FORMAT, writeIndex, "a")); this.writeIndex = writeIndex; - this.ioMonitor = ioMonitor; } synchronized boolean containsEntry(long msb, long lsb) { @@ -155,18 +124,13 @@ class TarWriter implements Closeable { * @return the byte buffer, or null if not in this file */ ByteBuffer readEntry(long msb, long lsb) throws IOException { - checkState(!closed); - TarEntry entry; synchronized (this) { + checkState(!closed); entry = index.get(new UUID(msb, lsb)); } if (entry != null) { - checkState(channel != null); // implied by entry != null - ByteBuffer data = ByteBuffer.allocate(entry.size()); - channel.read(data, entry.offset()); - data.rewind(); - return data; + return archive.readSegment(entry); } else { return null; } @@ -176,50 +140,17 @@ class TarWriter implements Closeable { checkNotNull(data); checkPositionIndexes(offset, offset + size, data.length); - UUID uuid = new UUID(msb, lsb); - CRC32 checksum = new CRC32(); - checksum.update(data, offset, size); - String entryName = String.format("%s.%08x", uuid, checksum.getValue()); - byte[] header = newEntryHeader(entryName, size); - - log.debug("Writing segment {} to {}", uuid, file); - return writeEntry(uuid, header, data, offset, size, generation); - } - - private synchronized long writeEntry(UUID uuid, byte[] header, byte[] data, int offset, int size, GCGeneration generation) throws IOException { - checkState(!closed); - - if (access == null) { - access = new RandomAccessFile(file, "rw"); - channel = access.getChannel(); - } - - long msb = uuid.getMostSignificantBits(); - long lsb = uuid.getLeastSignificantBits(); - - int padding = getPaddingSize(size); - - long initialLength = access.getFilePointer(); + synchronized (this) { + checkState(!closed); - access.write(header); + TarEntry entry = archive.writeSegment(msb, lsb, data, offset, size, generation); + long currentLength = archive.getLength(); - ioMonitor.beforeSegmentWrite(file, msb, lsb, size); - Stopwatch stopwatch = Stopwatch.createStarted(); - access.write(data, offset, size); - ioMonitor.afterSegmentWrite(file, msb, lsb, size, stopwatch.elapsed(TimeUnit.NANOSECONDS)); + checkState(currentLength <= Integer.MAX_VALUE); + index.put(new UUID(msb, lsb), entry); - if (padding > 0) { - access.write(ZERO_BYTES, 0, padding); + return currentLength; } - - long currentLength = access.getFilePointer(); - monitor.written(currentLength - initialLength); - - checkState(currentLength <= Integer.MAX_VALUE); - TarEntry entry = new TarEntry(msb, lsb, (int) (currentLength - size - padding), size, generation); - index.put(uuid, entry); - - return currentLength; } void addBinaryReference(GCGeneration generation, UUID segmentId, String reference) { @@ -246,17 +177,15 @@ class TarWriter implements Closeable { * @throws IOException if the tar file could not be flushed */ void flush() throws IOException { - synchronized (file) { - FileDescriptor descriptor = null; + synchronized (closeMonitor) { + boolean doFlush; synchronized (this) { - if (access != null && !closed) { - descriptor = access.getFD(); - } + doFlush = archive.isCreated() && !closed; } - if (descriptor != null) { - descriptor.sync(); + if (doFlush) { + archive.flush(); } } } @@ -277,28 +206,21 @@ class TarWriter implements Closeable { } // If nothing was written to this file, then we're already done. - if (access == null) { + if (!archive.isCreated()) { return; } // Complete the tar file by adding the graph, the index and the - // trailing two zero blocks. This code is synchronized on the file - // instance to ensure that no concurrent thread is still flushing + // trailing two zero blocks. This code is synchronized on the closeMonitor + // to ensure that no concurrent thread is still flushing // the file when we close the file handle. - long initialPosition, currentPosition; - synchronized (file) { - initialPosition = access.getFilePointer(); + synchronized (closeMonitor) { writeBinaryReferences(); writeGraph(); writeIndex(); - access.write(ZERO_BYTES); - access.write(ZERO_BYTES); - currentPosition = access.getFilePointer(); - access.close(); + archive.close(); } - - monitor.written(currentPosition - initialPosition); } /** @@ -311,24 +233,17 @@ class TarWriter implements Closeable { checkState(writeIndex >= 0); // If nothing was written to this file, then we're already done. synchronized (this) { - if (access == null) { + if (!archive.isCreated()) { return this; } } close(); int newIndex = writeIndex + 1; - return new TarWriter(file.getParentFile(), monitor, newIndex, ioMonitor); + return new TarWriter(archiveManager, newIndex); } private void writeBinaryReferences() throws IOException { - byte[] data = binaryReferences.write(); - int paddingSize = getPaddingSize(data.length); - byte[] header = newEntryHeader(file.getName() + ".brf", data.length + paddingSize); - access.write(header); - if (paddingSize > 0) { - access.write(ZERO_BYTES, 0, paddingSize); - } - access.write(data); + archive.writeBinaryReferences(binaryReferences.write()); } private void writeGraph() throws IOException { @@ -391,15 +306,7 @@ class TarWriter implements Closeable { buffer.putInt(graphSize); buffer.putInt(GRAPH_MAGIC); - int padding = getPaddingSize(graphSize); - - access.write(newEntryHeader(file.getName() + ".gph", graphSize + padding)); - - if (padding > 0) { - access.write(ZERO_BYTES, 0, padding); - } - - access.write(buffer.array()); + archive.writeGraph(buffer.array()); } private void writeIndex() throws IOException { @@ -418,70 +325,15 @@ class TarWriter implements Closeable { } byte[] index = writer.write(); - access.write(newEntryHeader(file.getName() + ".idx", index.length)); - access.write(index); - } - - private static byte[] newEntryHeader(String name, int size) { - byte[] header = new byte[BLOCK_SIZE]; - - // File name - byte[] nameBytes = name.getBytes(UTF_8); - System.arraycopy( - nameBytes, 0, header, 0, Math.min(nameBytes.length, 100)); - - // File mode - System.arraycopy( - String.format("%07o", 0400).getBytes(UTF_8), 0, - header, 100, 7); - - // User's numeric user ID - System.arraycopy( - String.format("%07o", 0).getBytes(UTF_8), 0, - header, 108, 7); - - // Group's numeric user ID - System.arraycopy( - String.format("%07o", 0).getBytes(UTF_8), 0, - header, 116, 7); - - // File size in bytes (octal basis) - System.arraycopy( - String.format("%011o", size).getBytes(UTF_8), 0, - header, 124, 11); - - // Last modification time in numeric Unix time format (octal) - long time = System.currentTimeMillis() / 1000; - System.arraycopy( - String.format("%011o", time).getBytes(UTF_8), 0, - header, 136, 11); - - // Checksum for header record - System.arraycopy( - new byte[] {' ', ' ', ' ', ' ', ' ', ' ', ' ', ' '}, 0, - header, 148, 8); - - // Type flag - header[156] = '0'; - - // Compute checksum - int checksum = 0; - for (byte aHeader : header) { - checksum += aHeader & 0xff; - } - System.arraycopy( - String.format("%06o\0 ", checksum).getBytes(UTF_8), 0, - header, 148, 8); - - return header; + archive.writeIndex(index); } synchronized long fileLength() { - return file.length(); + return archive.getLength(); } - synchronized File getFile() { - return file; + synchronized String getFileName() { + return archive.getName(); } synchronized boolean isClosed() { @@ -492,7 +344,7 @@ class TarWriter implements Closeable { @Override public String toString() { - return file.toString(); + return getFileName(); } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/ConsistencyChecker.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/ConsistencyChecker.java index 66a23c0c3a..bae32e626a 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/ConsistencyChecker.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/ConsistencyChecker.java @@ -55,6 +55,7 @@ import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; import org.apache.jackrabbit.oak.segment.file.JournalEntry; import org.apache.jackrabbit.oak.segment.file.JournalReader; import org.apache.jackrabbit.oak.segment.file.ReadOnlyFileStore; +import org.apache.jackrabbit.oak.segment.file.tar.LocalJournalFile; import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry; import org.apache.jackrabbit.oak.spi.state.NodeState; @@ -122,7 +123,7 @@ public class ConsistencyChecker implements Closeable { PrintWriter errWriter ) throws IOException, InvalidFileStoreVersionException { try ( - JournalReader journal = new JournalReader(new File(directory, journalFileName)); + JournalReader journal = new JournalReader(new LocalJournalFile(directory, journalFileName)); ConsistencyChecker checker = new ConsistencyChecker(directory, debugInterval, ioStatistics, outWriter, errWriter) ) { Map pathToJournalEntry = newHashMap(); diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/RevisionHistory.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/RevisionHistory.java index 9f95332316..ea85dea207 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/RevisionHistory.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/RevisionHistory.java @@ -36,6 +36,7 @@ import com.google.common.collect.Iterators; import org.apache.jackrabbit.oak.json.BlobSerializer; import org.apache.jackrabbit.oak.json.JsonSerializer; import org.apache.jackrabbit.oak.segment.SegmentNodeState; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence.JournalFile; import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; import org.apache.jackrabbit.oak.segment.file.JournalEntry; import org.apache.jackrabbit.oak.segment.file.JournalReader; @@ -75,7 +76,7 @@ public class RevisionHistory { * @return * @throws IOException */ - public Iterator getHistory(@Nonnull File journal, @Nonnull final String path) + public Iterator getHistory(@Nonnull JournalFile journal, @Nonnull final String path) throws IOException { checkNotNull(path); diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/Compact.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/Compact.java index 60cac1f8a9..0fc0e7adfb 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/Compact.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/Compact.java @@ -25,16 +25,18 @@ import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreB import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; import javax.annotation.CheckForNull; import javax.annotation.Nullable; import org.apache.jackrabbit.oak.segment.SegmentCache; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence.JournalFile; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence.JournalFileWriter; import org.apache.jackrabbit.oak.segment.file.FileStore; import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder; import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; import org.apache.jackrabbit.oak.segment.file.JournalReader; +import org.apache.jackrabbit.oak.segment.file.tar.LocalJournalFile; /** * Perform an offline compaction of an existing segment store. @@ -160,17 +162,16 @@ public class Compact implements Runnable { System.out.println(" -> cleaning up"); try (FileStore store = newFileStore()) { store.cleanup(); - File journal = new File(path, "journal.log"); + JournalFile journal = new LocalJournalFile(path, "journal.log"); String head; try (JournalReader journalReader = new JournalReader(journal)) { - head = journalReader.next().getRevision() + " root " + System.currentTimeMillis() + "\n"; + head = journalReader.next().getRevision() + " root " + System.currentTimeMillis(); } - try (RandomAccessFile journalFile = new RandomAccessFile(journal, "rw")) { + try (JournalFileWriter journalWriter = journal.openJournalWriter()) { System.out.println(" -> writing new " + journal.getName() + ": " + head); - journalFile.setLength(0); - journalFile.writeBytes(head); - journalFile.getChannel().force(false); + journalWriter.truncate(); + journalWriter.writeLine(head); } } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/History.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/History.java index 216d91f9bd..914739038e 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/History.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/History.java @@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkNotNull; import java.io.File; import java.util.Iterator; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence.JournalFile; +import org.apache.jackrabbit.oak.segment.file.tar.LocalJournalFile; import org.apache.jackrabbit.oak.segment.file.tooling.RevisionHistory; import org.apache.jackrabbit.oak.segment.file.tooling.RevisionHistory.HistoryElement; @@ -123,7 +125,7 @@ public class History implements Runnable { private final File path; - private final File journal; + private final JournalFile journal; private final String node; @@ -131,7 +133,7 @@ public class History implements Runnable { private History(Builder builder) { this.path = builder.path; - this.journal = builder.journal; + this.journal = new LocalJournalFile(builder.journal); this.node = builder.node; this.depth = builder.depth; } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/Utils.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/Utils.java index 6b7f8d9fbe..ffa0a99ae6 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/Utils.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/Utils.java @@ -30,10 +30,12 @@ import javax.annotation.Nonnull; import com.google.common.base.Function; import com.google.common.collect.Iterators; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence.JournalFile; import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; import org.apache.jackrabbit.oak.segment.file.JournalEntry; import org.apache.jackrabbit.oak.segment.file.JournalReader; import org.apache.jackrabbit.oak.segment.file.ReadOnlyFileStore; +import org.apache.jackrabbit.oak.segment.file.tar.LocalJournalFile; import org.apache.jackrabbit.oak.segment.file.tooling.BasicReadOnlyBlobStore; import org.apache.jackrabbit.oak.spi.blob.BlobStore; @@ -65,7 +67,7 @@ final class Utils { } static List readRevisions(File store) { - File journal = new File(store, "journal.log"); + JournalFile journal = new LocalJournalFile(store, "journal.log"); if (journal.exists()) { try (JournalReader journalReader = new JournalReader(journal)) { diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/GcJournalTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/GcJournalTest.java index cbbc70011e..75a0739fc0 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/GcJournalTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/GcJournalTest.java @@ -49,7 +49,7 @@ public class GcJournalTest { @Test public void tarGcJournal() throws Exception { File directory = segmentFolder.newFolder(); - GCJournal gc = new GCJournal(directory); + GCJournal gc = new GCJournal(new LocalGCJournalFile(directory, GCJournal.GC_JOURNAL)); gc.persist(0, 100, newGCGeneration(1, 0, false), 50, RecordId.NULL.toString10()); GCJournalEntry e0 = gc.read(); @@ -91,24 +91,24 @@ public class GcJournalTest { @Test public void testGCGeneration() throws Exception { - GCJournal out = new GCJournal(segmentFolder.getRoot()); + GCJournal out = new GCJournal(new LocalGCJournalFile(segmentFolder.getRoot(), GCJournal.GC_JOURNAL)); out.persist(1, 100, newGCGeneration(1, 2, false), 50, RecordId.NULL.toString()); - GCJournal in = new GCJournal(segmentFolder.getRoot()); + GCJournal in = new GCJournal(new LocalGCJournalFile(segmentFolder.getRoot(), GCJournal.GC_JOURNAL)); assertEquals(newGCGeneration(1, 2, false), in.read().getGcGeneration()); } @Test public void testGCGenerationCompactedFlagCleared() throws Exception { - GCJournal out = new GCJournal(segmentFolder.getRoot()); + GCJournal out = new GCJournal(new LocalGCJournalFile(segmentFolder.getRoot(), GCJournal.GC_JOURNAL)); out.persist(1, 100, newGCGeneration(1, 2, true), 50, RecordId.NULL.toString()); - GCJournal in = new GCJournal(segmentFolder.getRoot()); + GCJournal in = new GCJournal(new LocalGCJournalFile(segmentFolder.getRoot(), GCJournal.GC_JOURNAL)); assertEquals(newGCGeneration(1, 2, false), in.read().getGcGeneration()); } @Test public void testReadOak16GCLog() throws IOException { createOak16GCLog(); - GCJournal gcJournal = new GCJournal(segmentFolder.getRoot()); + GCJournal gcJournal = new GCJournal(new LocalGCJournalFile(segmentFolder.getRoot(), GCJournal.GC_JOURNAL)); GCJournalEntry entry = gcJournal.read(); assertEquals(45919825920L, entry.getRepoSize()); assertEquals(41394306048L, entry.getReclaimedSize()); @@ -121,7 +121,7 @@ public class GcJournalTest { @Test public void testUpdateOak16GCLog() throws IOException { createOak16GCLog(); - GCJournal gcJournal = new GCJournal(segmentFolder.getRoot()); + GCJournal gcJournal = new GCJournal(new LocalGCJournalFile(segmentFolder.getRoot(), GCJournal.GC_JOURNAL)); gcJournal.persist(75, 300, newGCGeneration(3, 0, false), 125, "bar"); ArrayList entries = newArrayList(gcJournal.readAll()); diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/JournalEntryTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/JournalEntryTest.java index 000efc13e2..a2d48969a8 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/JournalEntryTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/JournalEntryTest.java @@ -32,6 +32,7 @@ import com.google.common.base.Splitter; import com.google.common.io.Files; import org.apache.jackrabbit.oak.segment.SegmentNodeStore; import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; +import org.apache.jackrabbit.oak.segment.file.tar.LocalJournalFile; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; @@ -79,7 +80,7 @@ public class JournalEntryTest { long entryTime = Long.valueOf(parts.get(2)); assertTrue(entryTime >= startTime); - JournalReader jr = new JournalReader(journal); + JournalReader jr = new JournalReader(new LocalJournalFile(journal)); JournalEntry journalEntry = jr.next(); assertEquals(journalParts(lines.get(lines.size() - 1)).get(0), journalEntry.getRevision()); assertEquals(journalParts(lines.get(lines.size() - 1)).get(2), String.valueOf(journalEntry.getTimestamp())); diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/JournalReaderTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/JournalReaderTest.java index a36d6b1637..a97eb4dc76 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/JournalReaderTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/JournalReaderTest.java @@ -28,6 +28,7 @@ import java.io.File; import java.io.IOException; import com.google.common.collect.Iterators; +import org.apache.jackrabbit.oak.segment.file.tar.LocalJournalFile; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -144,7 +145,7 @@ public class JournalReaderTest { private JournalReader createJournalReader(String s) throws IOException { File journalFile = folder.newFile("jrt"); write(journalFile, s); - return new JournalReader(journalFile); + return new JournalReader(new LocalJournalFile(journalFile)); } } diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/ManifestCheckerTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/ManifestCheckerTest.java index d8e2ad46b0..8225a56f4e 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/ManifestCheckerTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/ManifestCheckerTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals; import java.io.File; import java.nio.file.Files; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence.ManifestFile; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -33,16 +34,19 @@ public class ManifestCheckerTest { @Rule public TemporaryFolder root = new TemporaryFolder(new File("target")); - private File manifest; + private File file; + + private ManifestFile manifest; @Before public void setUp() throws Exception { - manifest = root.newFile(); + file = root.newFile(); + manifest = new LocalManifestFile(file); } @Test(expected = InvalidFileStoreVersionException.class) public void testManifestShouldExist() throws Exception { - Files.delete(manifest.toPath()); + Files.delete(file.toPath()); newManifestChecker(manifest, true, 1, 2).checkManifest(); } @@ -81,7 +85,7 @@ public class ManifestCheckerTest { @Test public void testUpdateNonExistingManifest() throws Exception { - Files.delete(manifest.toPath()); + Files.delete(file.toPath()); newManifestChecker(manifest, false, 2, 3).checkAndUpdateManifest(); assertEquals(3, Manifest.load(manifest).getStoreVersion(0)); } diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/ManifestTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/ManifestTest.java index 1eb6ac0c92..aab97ebe4c 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/ManifestTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/ManifestTest.java @@ -32,12 +32,12 @@ public class ManifestTest { @Test public void defaultStoreVersionShouldBeReturned() throws Exception { - assertEquals(42, Manifest.load(folder.newFile()).getStoreVersion(42)); + assertEquals(42, Manifest.load(new LocalManifestFile(folder.newFile())).getStoreVersion(42)); } @Test public void storeVersionShouldBeReturned() throws Exception { - File file = folder.newFile(); + LocalManifestFile file = new LocalManifestFile(folder.newFile()); Manifest write = Manifest.empty(); write.setStoreVersion(42); diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/SizeDeltaGCEstimationTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/SizeDeltaGCEstimationTest.java index 4ceff195c2..7b2931a021 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/SizeDeltaGCEstimationTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/SizeDeltaGCEstimationTest.java @@ -37,7 +37,7 @@ public class SizeDeltaGCEstimationTest { @Before public void setUpJournal() throws Exception { - journal = new GCJournal(folder.getRoot()); + journal = new GCJournal(new LocalGCJournalFile(folder.getRoot(), GCJournal.GC_JOURNAL)); } @Test diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/TarRevisionsTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/TarRevisionsTest.java index d35b762c7f..9fada856eb 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/TarRevisionsTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/TarRevisionsTest.java @@ -46,6 +46,8 @@ import org.apache.jackrabbit.oak.segment.RecordId; import org.apache.jackrabbit.oak.segment.SegmentNodeBuilder; import org.apache.jackrabbit.oak.segment.SegmentNodeState; import org.apache.jackrabbit.oak.segment.SegmentReader; +import org.apache.jackrabbit.oak.segment.file.tar.LocalJournalFile; +import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -79,14 +81,14 @@ public class TarRevisionsTest { @Test(expected = IllegalStateException.class) public void unboundRevisions() throws IOException { - try (TarRevisions tarRevisions = new TarRevisions(folder.getRoot())) { + try (TarRevisions tarRevisions = new TarRevisions(new TarPersistence(folder.getRoot()))) { tarRevisions.getHead(); } } @Nonnull private JournalReader createJournalReader() throws IOException { - return new JournalReader(new File(getFileStoreFolder(), TarRevisions.JOURNAL_FILE_NAME)); + return new JournalReader(new LocalJournalFile(getFileStoreFolder(), TarRevisions.JOURNAL_FILE_NAME)); } @Test diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/tar/TarFileTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/tar/TarFileTest.java index 15cbc6da90..79ff22c6f6 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/tar/TarFileTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/tar/TarFileTest.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -43,18 +44,22 @@ import org.junit.rules.TemporaryFolder; public class TarFileTest { - private static GCGeneration generation(int full) { + protected static GCGeneration generation(int full) { return newGCGeneration(full, 0, false); } - private File file; - @Rule public TemporaryFolder folder = new TemporaryFolder(new File("target")); + protected SegmentArchiveManager archiveManager; + @Before public void setUp() throws IOException { - file = folder.newFile(); + archiveManager = new SegmentTarManager(folder.newFolder(), new FileStoreMonitorAdapter(), new IOMonitorAdapter(), false); + } + + protected long getWriteAndReadExpectedSize() { + return 5120; } @Test @@ -64,14 +69,13 @@ public class TarFileTest { long lsb = id.getLeastSignificantBits() & (-1 >>> 4); // OAK-1672 byte[] data = "Hello, World!".getBytes(UTF_8); - try (TarWriter writer = new TarWriter(file, new IOMonitorAdapter())) { + try (TarWriter writer = new TarWriter(archiveManager, "data00000a.tar")) { writer.writeEntry(msb, lsb, data, 0, data.length, generation(0)); assertEquals(ByteBuffer.wrap(data), writer.readEntry(msb, lsb)); } - assertEquals(5120, file.length()); - - try (TarReader reader = TarReader.open(file, false, new IOMonitorAdapter())) { + try (TarReader reader = TarReader.open("data00000a.tar", archiveManager)) { + assertEquals(getWriteAndReadExpectedSize(), reader.size()); assertEquals(ByteBuffer.wrap(data), reader.readEntry(msb, lsb)); } } @@ -84,11 +88,11 @@ public class TarFileTest { String data = "test"; byte[] buffer = data.getBytes(UTF_8); - try (TarWriter writer = new TarWriter(file, new IOMonitorAdapter())) { + try (TarWriter writer = new TarWriter(archiveManager, "data00000a.tar")) { writer.writeEntry(msb, lsb, buffer, 0, buffer.length, newGCGeneration(1, 2, false)); } - try (TarReader reader = TarReader.open(file, false, new IOMonitorAdapter())) { + try (TarReader reader = TarReader.open("data00000a.tar", archiveManager)) { TarEntry[] entries = reader.getEntries(); assertEquals(newGCGeneration(1, 2, false), entries[0].generation()); } @@ -102,11 +106,11 @@ public class TarFileTest { String data = "test"; byte[] buffer = data.getBytes(UTF_8); - try (TarWriter writer = new TarWriter(file, new IOMonitorAdapter())) { + try (TarWriter writer = new TarWriter(archiveManager, "data00000a.tar")) { writer.writeEntry(msb, lsb, buffer, 0, buffer.length, newGCGeneration(1, 2, true)); } - try (TarReader reader = TarReader.open(file, false, new IOMonitorAdapter())) { + try (TarReader reader = TarReader.open("data00000a.tar", archiveManager)) { TarEntry[] entries = reader.getEntries(); assertEquals(newGCGeneration(1, 2, true), entries[0].generation()); } @@ -114,7 +118,7 @@ public class TarFileTest { @Test public void testWriteAndReadBinaryReferences() throws Exception { - try (TarWriter writer = new TarWriter(file, new IOMonitorAdapter())) { + try (TarWriter writer = new TarWriter(archiveManager, "data00000a.tar")) { writer.writeEntry(0x00, 0x00, new byte[] {0x01, 0x02, 0x3}, 0, 3, generation(0)); writer.addBinaryReference(generation(1), new UUID(1, 0), "r0"); @@ -154,7 +158,7 @@ public class TarFileTest { expected.put(generation(2), two); expected.put(generation(3), three); - try (TarReader reader = TarReader.open(file, false, new IOMonitorAdapter())) { + try (TarReader reader = TarReader.open("data00000a.tar", archiveManager)) { Map>> actual = new HashMap<>(); reader.getBinaryReferences().forEach((generation, full, compacted, id, reference) -> { @@ -170,7 +174,7 @@ public class TarFileTest { @Test public void binaryReferencesIndexShouldBeTrimmedDownOnSweep() throws Exception { - try (TarWriter writer = new TarWriter(file, new IOMonitorAdapter())) { + try (TarWriter writer = new TarWriter(archiveManager, "data00000a.tar")) { writer.writeEntry(1, 1, new byte[] {1}, 0, 1, generation(1)); writer.writeEntry(1, 2, new byte[] {1}, 0, 1, generation(1)); writer.writeEntry(2, 1, new byte[] {1}, 0, 1, generation(2)); @@ -185,7 +189,7 @@ public class TarFileTest { Set sweep = newSet(new UUID(1, 1), new UUID(2, 2)); - try (TarReader reader = TarReader.open(file, false, new IOMonitorAdapter())) { + try (TarReader reader = TarReader.open("data00000a.tar", archiveManager)) { try (TarReader swept = reader.sweep(sweep, new HashSet<>())) { assertNotNull(swept); @@ -214,12 +218,12 @@ public class TarFileTest { @Test public void binaryReferencesIndexShouldContainCompleteGCGeneration() throws Exception { - try (TarWriter writer = new TarWriter(file, new IOMonitorAdapter())) { + try (TarWriter writer = new TarWriter(archiveManager, "data00000a.tar")) { writer.writeEntry(0x00, 0x00, new byte[] {0x01, 0x02, 0x3}, 0, 3, generation(0)); writer.addBinaryReference(newGCGeneration(1, 2, false), new UUID(1, 2), "r1"); writer.addBinaryReference(newGCGeneration(3, 4, true), new UUID(3, 4), "r2"); } - try (TarReader reader = TarReader.open(file, false, new IOMonitorAdapter())) { + try (TarReader reader = TarReader.open("data00000a.tar", archiveManager)) { Set expected = new HashSet<>(); expected.add(newGCGeneration(1, 2, false)); expected.add(newGCGeneration(3, 4, true)); @@ -233,7 +237,7 @@ public class TarFileTest { @Test public void graphShouldBeTrimmedDownOnSweep() throws Exception { - try (TarWriter writer = new TarWriter(file, new IOMonitorAdapter())) { + try (TarWriter writer = new TarWriter(archiveManager, "data00000a.tar")) { writer.writeEntry(1, 1, new byte[] {1}, 0, 1, generation(1)); writer.writeEntry(1, 2, new byte[] {1}, 0, 1, generation(1)); writer.writeEntry(1, 3, new byte[] {1}, 0, 1, generation(1)); @@ -249,7 +253,7 @@ public class TarFileTest { Set sweep = newSet(new UUID(1, 2), new UUID(2, 3)); - try (TarReader reader = TarReader.open(file, false, new IOMonitorAdapter())) { + try (TarReader reader = TarReader.open("data00000a.tar", archiveManager)) { try (TarReader swept = reader.sweep(sweep, new HashSet())) { assertNotNull(swept); diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/tar/TarWriterTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/tar/TarWriterTest.java index c49e57e7e1..54ddd1135e 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/tar/TarWriterTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/tar/TarWriterTest.java @@ -29,6 +29,8 @@ import java.io.File; import java.io.IOException; import java.util.UUID; +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -38,15 +40,25 @@ public class TarWriterTest { @Rule public TemporaryFolder folder = new TemporaryFolder(new File("target")); + protected SegmentArchiveManager archiveManager; + + protected TestFileStoreMonitor monitor; + + @Before + public void setUp() throws IOException { + monitor = new TestFileStoreMonitor(); + archiveManager = new SegmentTarManager(folder.newFolder(), monitor, new IOMonitorAdapter(), false); + } + @Test public void createNextGenerationTest() throws IOException { int counter = 2222; - TarWriter t0 = new TarWriter(folder.newFolder(), new FileStoreMonitorAdapter(), counter, new IOMonitorAdapter()); + TarWriter t0 = new TarWriter(archiveManager, counter); // not dirty, will not create a new writer TarWriter t1 = t0.createNextGeneration(); assertEquals(t0, t1); - assertTrue(t1.getFile().getName().contains("" + counter)); + assertTrue(t1.getFileName().contains("" + counter)); // dirty, will create a new writer UUID id = UUID.randomUUID(); @@ -58,10 +70,10 @@ public class TarWriterTest { TarWriter t2 = t1.createNextGeneration(); assertNotEquals(t1, t2); assertTrue(t1.isClosed()); - assertTrue(t2.getFile().getName().contains("" + (counter + 1))); + assertTrue(t2.getFileName().contains("" + (counter + 1))); } - private static class TestFileStoreMonitor extends FileStoreMonitorAdapter { + public static class TestFileStoreMonitor extends FileStoreMonitorAdapter { long written; @@ -74,8 +86,7 @@ public class TarWriterTest { @Test public void testFileStoreMonitor() throws Exception { - TestFileStoreMonitor monitor = new TestFileStoreMonitor(); - try (TarWriter writer = new TarWriter(folder.getRoot(), monitor, 0, new IOMonitorAdapter())) { + try (TarWriter writer = new TarWriter(archiveManager, 0)) { long sizeBefore = writer.fileLength(); long writtenBefore = monitor.written; writer.writeEntry(0, 0, new byte[42], 0, 42, newGCGeneration(0, 0, false)); diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/upgrade/UpgradeIT.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/upgrade/UpgradeIT.java index 02b9076e7e..0be1151f51 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/upgrade/UpgradeIT.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/upgrade/UpgradeIT.java @@ -39,6 +39,7 @@ import javax.annotation.Nonnull; import org.apache.jackrabbit.oak.segment.SegmentVersion; import org.apache.jackrabbit.oak.segment.data.SegmentData; import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; +import org.apache.jackrabbit.oak.segment.file.LocalManifestFile; import org.apache.jackrabbit.oak.segment.file.tar.IOMonitorAdapter; import org.apache.jackrabbit.oak.segment.file.tar.TarFiles; import org.apache.jackrabbit.oak.segment.tool.Compact; @@ -124,7 +125,7 @@ public class UpgradeIT { } private void checkStoreVersion(int version) throws IOException, InvalidFileStoreVersionException { - newManifestChecker(new File(fileStoreHome.getRoot(), "manifest"), + newManifestChecker(new LocalManifestFile(fileStoreHome.getRoot(), "manifest"), true, version, version).checkManifest(); }