Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java (revision 1697688) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java (working copy) @@ -18,6 +18,7 @@ import static java.lang.System.currentTimeMillis; +import java.io.Closeable; import java.util.Date; import org.slf4j.Logger; @@ -30,7 +31,7 @@ * This class also measures and logs the time taken by the Runnable.run() * method. */ -class BackgroundThread extends Thread { +class BackgroundThread extends Thread implements Closeable { /** Logger instance */ private static final Logger log = @@ -89,7 +90,8 @@ trigger(false); } - void close() { + @Override + public void close() { try { trigger(true); join(); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (revision 1697688) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (working copy) @@ -27,6 +27,7 @@ import static java.lang.String.format; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; +import static org.apache.jackrabbit.oak.commons.IOUtils.closeQuietly; import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount; import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE; import static org.apache.jackrabbit.oak.plugins.segment.CompactionMap.sum; @@ -57,8 +58,8 @@ import com.google.common.collect.Maps; import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob; -import org.apache.jackrabbit.oak.plugins.segment.Compactor; import org.apache.jackrabbit.oak.plugins.segment.CompactionMap; +import org.apache.jackrabbit.oak.plugins.segment.Compactor; import org.apache.jackrabbit.oak.plugins.segment.PersistedCompactionMap; import org.apache.jackrabbit.oak.plugins.segment.RecordId; import org.apache.jackrabbit.oak.plugins.segment.Segment; @@ -292,14 +293,14 @@ @Nonnull public FileStore create() throws IOException { return new FileStore( - blobStore, directory, root, maxFileSize, cacheSize, memoryMapping, gcMonitor); + blobStore, directory, root, maxFileSize, cacheSize, memoryMapping, gcMonitor, false); } } @Deprecated public FileStore(BlobStore blobStore, File directory, int maxFileSizeMB, boolean memoryMapping) throws IOException { - this(blobStore, directory, EMPTY_NODE, maxFileSizeMB, 0, memoryMapping, GCMonitor.EMPTY); + this(blobStore, directory, EMPTY_NODE, maxFileSizeMB, 0, memoryMapping, GCMonitor.EMPTY, false); } @Deprecated @@ -317,26 +318,33 @@ @Deprecated public FileStore(File directory, int maxFileSizeMB, int cacheSizeMB, boolean memoryMapping) throws IOException { - this(null, directory, EMPTY_NODE, maxFileSizeMB, cacheSizeMB, memoryMapping, GCMonitor.EMPTY); + this(null, directory, EMPTY_NODE, maxFileSizeMB, cacheSizeMB, memoryMapping, GCMonitor.EMPTY, false); } @Deprecated FileStore(File directory, NodeState initial, int maxFileSize) throws IOException { - this(null, directory, initial, maxFileSize, -1, MEMORY_MAPPING_DEFAULT, GCMonitor.EMPTY); + this(null, directory, initial, maxFileSize, -1, MEMORY_MAPPING_DEFAULT, GCMonitor.EMPTY, false); } @Deprecated public FileStore( BlobStore blobStore, final File directory, NodeState initial, int maxFileSizeMB, int cacheSizeMB, boolean memoryMapping) throws IOException { - this(blobStore, directory, initial, maxFileSizeMB, cacheSizeMB, memoryMapping, GCMonitor.EMPTY); + this(blobStore, directory, initial, maxFileSizeMB, cacheSizeMB, memoryMapping, GCMonitor.EMPTY, false); } private FileStore( BlobStore blobStore, final File directory, NodeState initial, int maxFileSizeMB, - int cacheSizeMB, boolean memoryMapping, GCMonitor gcMonitor) + int cacheSizeMB, boolean memoryMapping, GCMonitor gcMonitor, boolean readonly) throws IOException { - checkNotNull(directory).mkdirs(); + + if (readonly) { + checkNotNull(directory); + checkState(directory.exists() && directory.isDirectory()); + } else { + checkNotNull(directory).mkdirs(); + } + if (cacheSizeMB < 0) { this.tracker = new SegmentTracker(this, 0, getVersion()); } else if (cacheSizeMB > 0) { @@ -350,26 +358,40 @@ this.memoryMapping = memoryMapping; this.gcMonitor = gcMonitor; - journalFile = new RandomAccessFile(new File(directory, JOURNAL_FILE_NAME), "rw"); - lockFile = new RandomAccessFile(new File(directory, LOCK_FILE_NAME), "rw"); + if (readonly) { + journalFile = new RandomAccessFile(new File(directory, + JOURNAL_FILE_NAME), "r"); + } else { + journalFile = new RandomAccessFile(new File(directory, + JOURNAL_FILE_NAME), "rw"); + } Map> map = collectFiles(directory); this.readers = newArrayListWithCapacity(map.size()); Integer[] indices = map.keySet().toArray(new Integer[map.size()]); Arrays.sort(indices); for (int i = indices.length - 1; i >= 0; i--) { - readers.add(TarReader.open(map.get(indices[i]), memoryMapping)); + if (!readonly) { + readers.add(TarReader.open(map.get(indices[i]), memoryMapping)); + } else { + // only try to read-only recover the latest file as that might + // be the *only* one still being accessed by a writer + boolean recover = i == indices.length - 1; + readers.add(TarReader.openRO(map.get(indices[i]), + memoryMapping, recover)); + } } - if (indices.length > 0) { - this.writeNumber = indices[indices.length - 1] + 1; - } else { - this.writeNumber = 0; + if (!readonly) { + if (indices.length > 0) { + this.writeNumber = indices[indices.length - 1] + 1; + } else { + this.writeNumber = 0; + } + this.writeFile = new File(directory, String.format( + FILE_NAME_FORMAT, writeNumber, "a")); + this.writer = new TarWriter(writeFile); } - this.writeFile = new File( - directory, - String.format(FILE_NAME_FORMAT, writeNumber, "a")); - this.writer = new TarWriter(writeFile); RecordId id = null; JournalReader journalReader = new JournalReader(new File(directory, JOURNAL_FILE_NAME)); @@ -391,7 +413,15 @@ } journalFile.seek(journalFile.length()); - lock = lockFile.getChannel().lock(); + + if (!readonly) { + lockFile = new RandomAccessFile( + new File(directory, LOCK_FILE_NAME), "rw"); + lock = lockFile.getChannel().lock(); + } else { + lockFile = null; + lock = null; + } if (id != null) { head = new AtomicReference(id); @@ -404,29 +434,39 @@ persistedHead = new AtomicReference(null); } - this.flushThread = new BackgroundThread( - "TarMK flush thread [" + directory + "]", 5000, // 5s interval - new Runnable() { - @Override - public void run() { - try { - flush(); - } catch (IOException e) { - log.warn("Failed to flush the TarMK at" + - directory, e); + if (!readonly) { + this.flushThread = new BackgroundThread( + "TarMK flush thread [" + directory + "]", 5000, // 5s interval + new Runnable() { + @Override + public void run() { + try { + flush(); + } catch (IOException e) { + log.warn("Failed to flush the TarMK at" + + directory, e); + } } - } - }); - this.compactionThread = new BackgroundThread( - "TarMK compaction thread [" + directory + "]", -1, - new Runnable() { - @Override - public void run() { - maybeCompact(true); - } - }); + }); + this.compactionThread = new BackgroundThread( + "TarMK compaction thread [" + directory + "]", -1, + new Runnable() { + @Override + public void run() { + maybeCompact(true); + } + }); + } else { + this.flushThread = null; + this.compactionThread = null; + } - log.info("TarMK opened: {} (mmap={})", directory, memoryMapping); + if (readonly) { + log.info("TarMK ReadOnly opened: {} (mmap={})", directory, + memoryMapping); + } else { + log.info("TarMK opened: {} (mmap={})", directory, memoryMapping); + } } public boolean maybeCompact(boolean cleanup) { @@ -586,7 +626,10 @@ * @return number of segments */ private synchronized int count() { - int count = writer.count(); + int count = 0; + if (writer != null) { + count += writer.count(); + } for (TarReader reader : readers) { count += reader.count(); } @@ -690,7 +733,8 @@ if (cleaned != null) { list.add(cleaned); } - File file = reader.close(); + closeQuietly(reader); + File file = reader.getFile(); gcMonitor.info("TarMK revision cleanup reclaiming {}", file.getName()); toBeRemoved.addLast(file); } @@ -786,10 +830,12 @@ public synchronized Iterable getSegmentIds() { List ids = newArrayList(); - for (UUID uuid : writer.getUUIDs()) { - ids.add(tracker.getSegmentId( - uuid.getMostSignificantBits(), - uuid.getLeastSignificantBits())); + if (writer != null) { + for (UUID uuid : writer.getUUIDs()) { + ids.add(tracker.getSegmentId( + uuid.getMostSignificantBits(), + uuid.getLeastSignificantBits())); + } } for (TarReader reader : readers) { for (UUID uuid : reader.getUUIDs()) { @@ -822,25 +868,30 @@ public void close() { // avoid deadlocks by closing (and joining) the background // threads before acquiring the synchronization lock - compactionThread.close(); - flushThread.close(); + closeQuietly(compactionThread); + closeQuietly(flushThread); synchronized (this) { try { flush(); - writer.close(); + if (writer != null) { + writer.close(); + } tracker.getWriter().dropCache(); List list = readers; readers = newArrayList(); for (TarReader reader : list) { - reader.close(); + closeQuietly(reader); } - lock.release(); - lockFile.close(); - journalFile.close(); + if (lock != null) { + // in java 1.7 this is also a 'Closeable' + lock.release(); + } + closeQuietly(lockFile); + closeQuietly(journalFile); } catch (IOException e) { throw new RuntimeException( "Failed to close the TarMK at " + directory, e); @@ -870,9 +921,11 @@ } } - synchronized (this) { - if (writer.containsEntry(msb, lsb)) { - return true; + if (writer != null) { + synchronized (this) { + if (writer.containsEntry(msb, lsb)) { + return true; + } } } @@ -910,14 +963,16 @@ } } - synchronized (this) { - try { - ByteBuffer buffer = writer.readEntry(msb, lsb); - if (buffer != null) { - return new Segment(tracker, id, buffer); + if (writer != null) { + synchronized (this) { + try { + ByteBuffer buffer = writer.readEntry(msb, lsb); + if (buffer != null) { + return new Segment(tracker, id, buffer); + } + } catch (IOException e) { + log.warn("Failed to read from tar file " + writer, e); } - } catch (IOException e) { - log.warn("Failed to read from tar file " + writer, e); } } @@ -1045,9 +1100,8 @@ */ public static class ReadOnlyStore extends FileStore { public ReadOnlyStore(File directory) throws IOException { - super(directory, 266); - super.flushThread.close(); - super.compactionThread.close(); + super(null, directory, EMPTY_NODE, -1, 0, MEMORY_MAPPING_DEFAULT, + GCMonitor.EMPTY, true); } /** Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java (revision 1697688) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java (working copy) @@ -29,6 +29,7 @@ import static org.apache.jackrabbit.oak.plugins.segment.SegmentId.isDataSegmentId; import static org.apache.jackrabbit.oak.plugins.segment.file.TarWriter.GRAPH_MAGIC; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; @@ -50,7 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class TarReader { +class TarReader implements Closeable { /** Logger instance */ private static final Logger log = LoggerFactory.getLogger(TarReader.class); @@ -117,23 +118,90 @@ log.warn("Could not find a valid tar index in {}, recovering...", list); LinkedHashMap entries = newLinkedHashMap(); for (File file : sorted.values()) { - log.info("Recovering segments from tar file {}", file); + collectFileEntries(file, entries, true); + } + + // regenerate the first generation based on the recovered data + File file = sorted.values().iterator().next(); + generateTarFile(entries, file); + + reader = openFirstFileWithValidIndex(singletonList(file), memoryMapping); + if (reader != null) { + return reader; + } else { + throw new IOException("Failed to open recovered tar file " + file); + } + } + + static TarReader openRO(Map files, boolean memoryMapping, + boolean recover) throws IOException { + // for readonly store only try the latest generation of a given + // tar file to prevent any rollback or rewrite + File file = files.get(Collections.max(files.keySet())); + + TarReader reader = openFirstFileWithValidIndex(singletonList(file), + memoryMapping); + if (reader != null) { + return reader; + } + if (recover) { + log.warn( + "Could not find a valid tar index in {}, recovering read-only", + file); + // collecting the entries (without touching the original file) and + // writing them into an artificial tar file '.ro.bak' + LinkedHashMap entries = newLinkedHashMap(); + collectFileEntries(file, entries, false); + file = findAvailGen(file, ".ro.bak"); + generateTarFile(entries, file); + reader = openFirstFileWithValidIndex(singletonList(file), + memoryMapping); + if (reader != null) { + return reader; + } + } + + throw new IOException("Failed to open tar file " + file); + } + + /** + * Collects all entries from the given file and optionally backs-up the + * file, by renaming it to a ".bak" extension + * + * @param file + * @param entries + * @param backup + * @throws IOException + */ + private static void collectFileEntries(File file, + LinkedHashMap entries, boolean backup) + throws IOException { + log.info("Recovering segments from tar file {}", file); + try { + RandomAccessFile access = new RandomAccessFile(file, "r"); try { - RandomAccessFile access = new RandomAccessFile(file, "r"); - try { - recoverEntries(file, access, entries); - } finally { - access.close(); - } - } catch (IOException e) { - log.warn("Could not read tar file " + file + ", skipping...", e); + recoverEntries(file, access, entries); + } finally { + access.close(); } + } catch (IOException e) { + log.warn("Could not read tar file " + file + ", skipping...", e); + } + if (backup) { backupSafely(file); } + } - // regenerate the first generation based on the recovered data - File file = sorted.values().iterator().next(); + /** + * Regenerates a tar file from a list of entries. + * + * @param entries + * @param file + * @throws IOException + */ + private static void generateTarFile(LinkedHashMap entries, + File file) throws IOException { log.info("Regenerating tar file " + file); TarWriter writer = new TarWriter(file); for (Map.Entry entry : entries.entrySet()) { @@ -145,13 +213,6 @@ data, 0, data.length); } writer.close(); - - reader = openFirstFileWithValidIndex(singletonList(file), memoryMapping); - if (reader != null) { - return reader; - } else { - throw new IOException("Failed to open recovered tar file " + file); - } } /** @@ -163,14 +224,7 @@ * @throws IOException */ private static void backupSafely(File file) throws IOException { - File parent = file.getParentFile(); - String name = file.getName(); - - File backup = new File(parent, name + ".bak"); - for (int i = 2; backup.exists(); i++) { - backup = new File(parent, name + "." + i + ".bak"); - } - + File backup = findAvailGen(file, ".bak"); log.info("Backing up " + file + " to " + backup.getName()); if (!file.renameTo(backup)) { log.warn("Renaming failed, so using copy to backup {}", file); @@ -182,6 +236,23 @@ } } + /** + * Fine next available generation number so that a generated file doesn't + * overwrite another existing file. + * + * @param file + * @throws IOException + */ + private static File findAvailGen(File file, String ext) { + File parent = file.getParentFile(); + String name = file.getName(); + File backup = new File(parent, name + ext); + for (int i = 2; backup.exists(); i++) { + backup = new File(parent, name + "." + i + ext); + } + return backup; + } + private static TarReader openFirstFileWithValidIndex(List files, boolean memoryMapping) { for (File file : files) { String name = file.getName(); @@ -705,10 +776,10 @@ return closed; } - File close() throws IOException { + @Override + public void close() throws IOException { closed = true; access.close(); - return file; } //-----------------------------------------------------------< private >-- Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java (revision 1697688) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java (working copy) @@ -29,6 +29,7 @@ import static org.apache.jackrabbit.oak.plugins.segment.Segment.REF_COUNT_OFFSET; import static org.apache.jackrabbit.oak.plugins.segment.SegmentId.isDataSegmentId; +import java.io.Closeable; import java.io.File; import java.io.FileDescriptor; import java.io.IOException; @@ -51,7 +52,7 @@ * A writer for tar files. It is also used to read entries while the file is * still open. */ -class TarWriter { +class TarWriter implements Closeable { /** Logger instance */ private static final Logger log = LoggerFactory.getLogger(TarWriter.class); @@ -287,7 +288,8 @@ * * @throws IOException if the tar file could not be closed */ - void close() throws IOException { + @Override + public void close() throws IOException { // Mark this writer as closed. Note that we only need to synchronize // this part, as no other synchronized methods should get invoked // once close() has been initiated (see related checkState calls). Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java (revision 1697688) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java (working copy) @@ -43,6 +43,7 @@ import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeBuilder; import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState; import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter; +import org.apache.jackrabbit.oak.plugins.segment.file.FileStore.ReadOnlyStore; import org.junit.Before; import org.junit.Test; @@ -256,4 +257,26 @@ } } + @Test + public void nonBlockingROStore() throws IOException { + FileStore store = new FileStore(directory, 1, false); + store.flush(); // first 1kB + SegmentNodeState base = store.getHead(); + SegmentNodeBuilder builder = base.builder(); + builder.setProperty("step", "a"); + store.setHead(base, builder.getNodeState()); + store.flush(); // second 1kB + + ReadOnlyStore ro = null; + try { + ro = new ReadOnlyStore(directory); + assertEquals(store.getHead(), ro.getHead()); + } finally { + if (ro != null) { + ro.close(); + } + store.close(); + } + } + }