Index: lucene/core/src/java/org/apache/lucene/index/PersistentSnapshotDeletionPolicy.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/PersistentSnapshotDeletionPolicy.java (revision 1478459) +++ lucene/core/src/java/org/apache/lucene/index/PersistentSnapshotDeletionPolicy.java (working copy) @@ -17,14 +17,20 @@ * the License. */ -import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map.Entry; +import java.util.Map; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.StoredField; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.IndexWriterConfig.OpenMode; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.Version; /** @@ -33,66 +39,31 @@ * are persisted in a {@link Directory} and are committed as soon as * {@link #snapshot()} or {@link #release(IndexCommit)} is called. *

- * NOTE: this class receives a {@link Directory} to persist the data into - * a Lucene index. It is highly recommended to use a dedicated directory (and on - * stable storage as well) for persisting the snapshots' information, and not - * reuse the content index directory, or otherwise conflicts and index - * corruption will occur. - *

- * NOTE: you should call {@link #close()} when you're done using this - * class for safety (it will close the {@link IndexWriter} instance used). - *

* NOTE: Sharing {@link PersistentSnapshotDeletionPolicy}s that write to * the same directory across {@link IndexWriter}s will corrupt snapshots. You * should make sure every {@link IndexWriter} has its own * {@link PersistentSnapshotDeletionPolicy} and that they all write to a - * different {@link Directory}. + * different {@link Directory}. It is OK to use the same + * Directory that holds the index. * *

This class adds a {@link #release(long)} method to * release commits from a previous snapshot's {@link IndexCommit#getGeneration}. * * @lucene.experimental */ -public class PersistentSnapshotDeletionPolicy extends SnapshotDeletionPolicy implements Closeable { +public class PersistentSnapshotDeletionPolicy extends SnapshotDeletionPolicy { - // Used to validate that the given directory includes just one document w/ the - // given gen field. Otherwise, it's not a valid Directory for snapshotting. - private static final String SNAPSHOTS_GENS = "$SNAPSHOTS_DOC$"; + private static final String SNAPSHOTS_PREFIX = "snapshots_"; + private static final int VERSION_START = 0; + private static final int VERSION_CURRENT = VERSION_START; + private static final String CODEC_NAME = "snapshots"; // The index writer which maintains the snapshots metadata - private final IndexWriter writer; + private long nextWriteGen; + private final Directory dir; + /** - * Reads the snapshots information from the given {@link Directory}. This - * method can be used if the snapshots information is needed, however you - * cannot instantiate the deletion policy (because e.g., some other process - * keeps a lock on the snapshots directory). - */ - private void loadPriorSnapshots(Directory dir) throws IOException { - IndexReader r = DirectoryReader.open(dir); - try { - int numDocs = r.numDocs(); - // index is allowed to have exactly one document or 0. - if (numDocs == 1) { - StoredDocument doc = r.document(r.maxDoc() - 1); - if (doc.getField(SNAPSHOTS_GENS) == null) { - throw new IllegalStateException("directory is not a valid snapshots store!"); - } - for (StorableField f : doc) { - if (!f.name().equals(SNAPSHOTS_GENS)) { - refCounts.put(Long.parseLong(f.name()), Integer.parseInt(f.stringValue())); - } - } - } else if (numDocs != 0) { - throw new IllegalStateException( - "should be at most 1 document in the snapshots directory: " + numDocs); - } - } finally { - r.close(); - } - } - - /** * {@link PersistentSnapshotDeletionPolicy} wraps another * {@link IndexDeletionPolicy} to enable flexible snapshotting. * @@ -112,29 +83,19 @@ * IndexWriter. */ public PersistentSnapshotDeletionPolicy(IndexDeletionPolicy primary, - Directory dir, OpenMode mode, Version matchVersion) throws IOException { + Directory dir, OpenMode mode) throws IOException { super(primary); - // Initialize the index writer over the snapshot directory. - writer = new IndexWriter(dir, new IndexWriterConfig(matchVersion, null).setOpenMode(mode)); - if (mode != OpenMode.APPEND) { - // IndexWriter no longer creates a first commit on an empty Directory. So - // if we were asked to CREATE*, call commit() just to be sure. If the - // index contains information and mode is CREATE_OR_APPEND, it's a no-op. - writer.commit(); + this.dir = dir; + + if (mode == OpenMode.CREATE) { + clearPriorSnapshots(); } - try { - // Initializes the snapshots information. This code should basically run - // only if mode != CREATE, but if it is, it's no harm as we only open the - // reader once and immediately close it. - loadPriorSnapshots(dir); - } catch (RuntimeException e) { - writer.close(); // don't leave any open file handles - throw e; - } catch (IOException e) { - writer.close(); // don't leave any open file handles - throw e; + loadPriorSnapshots(); + + if (mode == OpenMode.APPEND && nextWriteGen == 0) { + throw new IllegalStateException("no snapshots stored in this directory"); } } @@ -147,7 +108,19 @@ @Override public synchronized IndexCommit snapshot() throws IOException { IndexCommit ic = super.snapshot(); - persist(); + boolean success = false; + try { + persist(); + success = true; + } finally { + if (!success) { + try { + super.release(ic); + } catch (Exception e) { + // Suppress so we keep throwing original exception + } + } + } return ic; } @@ -160,7 +133,19 @@ @Override public synchronized void release(IndexCommit commit) throws IOException { super.release(commit); - persist(); + boolean success = false; + try { + persist(); + success = true; + } finally { + if (!success) { + try { + incRef(commit); + } catch (Exception e) { + // Suppress so we keep throwing original exception + } + } + } } /** @@ -175,23 +160,100 @@ persist(); } - /** Closes the index which writes the snapshots to the directory. */ - public void close() throws IOException { - writer.close(); + synchronized private void persist() throws IOException { + String fileName = SNAPSHOTS_PREFIX + nextWriteGen; + IndexOutput out = dir.createOutput(fileName, IOContext.DEFAULT); + boolean success = false; + try { + CodecUtil.writeHeader(out, CODEC_NAME, VERSION_CURRENT); + out.writeVInt(refCounts.size()); + for(Entry ent : refCounts.entrySet()) { + out.writeVLong(ent.getKey()); + out.writeVInt(ent.getValue()); + } + success = true; + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(out); + try { + dir.deleteFile(fileName); + } catch (Exception e) { + // Suppress so we keep throwing original exception + } + } else { + IOUtils.close(out); + } + } + + nextWriteGen++; } + private synchronized void clearPriorSnapshots() throws IOException { + for(String file : dir.listAll()) { + if (file.startsWith(SNAPSHOTS_PREFIX)) { + dir.deleteFile(file); + } + } + } + /** - * Persists all snapshots information. + * Reads the snapshots information from the given {@link Directory}. This + * method can be used if the snapshots information is needed, however you + * cannot instantiate the deletion policy (because e.g., some other process + * keeps a lock on the snapshots directory). */ - private void persist() throws IOException { - writer.deleteAll(); - Document d = new Document(); - d.add(new StoredField(SNAPSHOTS_GENS, "")); - for (Entry e : refCounts.entrySet()) { - d.add(new StoredField(e.getKey().toString(), e.getValue().toString())); + private synchronized void loadPriorSnapshots() throws IOException { + long genLoaded = -1; + IOException ioe = null; + List snapshotFiles = new ArrayList(); + for(String file : dir.listAll()) { + if (file.startsWith(SNAPSHOTS_PREFIX)) { + long gen = Long.parseLong(file.substring(SNAPSHOTS_PREFIX.length())); + if (genLoaded == -1 || gen > genLoaded) { + snapshotFiles.add(file); + Map m = new HashMap(); + IndexInput in = dir.openInput(file, IOContext.DEFAULT); + try { + CodecUtil.checkHeader(in, CODEC_NAME, VERSION_START, VERSION_START); + int count = in.readVInt(); + for(int i=0;i 1) { + // Remove any broken / old snapshot files: + String curFileName = SNAPSHOTS_PREFIX + genLoaded; + for(String file : snapshotFiles) { + if (!curFileName.equals(file)) { + dir.deleteFile(file); + } + } + } + nextWriteGen = 1+genLoaded; + } } - } Index: lucene/core/src/java/org/apache/lucene/index/SnapshotDeletionPolicy.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/SnapshotDeletionPolicy.java (revision 1478459) +++ lucene/core/src/java/org/apache/lucene/index/SnapshotDeletionPolicy.java (working copy) @@ -111,6 +111,19 @@ } } + protected synchronized void incRef(IndexCommit ic) { + long gen = ic.getGeneration(); + Integer refCount = refCounts.get(gen); + int refCountInt; + if (refCount == null) { + indexCommits.put(gen, lastCommit); + refCountInt = 0; + } else { + refCountInt = refCount.intValue(); + } + refCounts.put(gen, refCountInt+1); + } + /** * Snapshots the last commit and returns it. Once a commit is 'snapshotted,' it is protected * from deletion (as long as this {@link IndexDeletionPolicy} is used). The @@ -134,19 +147,8 @@ throw new IllegalStateException("No index commit to snapshot"); } - long gen = lastCommit.getGeneration(); + incRef(lastCommit); - Integer refCount = refCounts.get(gen); - int refCountInt; - if (refCount == null) { - indexCommits.put(gen, lastCommit); - refCountInt = 0; - } else { - refCountInt = refCount.intValue(); - } - - refCounts.put(gen, refCountInt+1); - return lastCommit; } Index: lucene/core/src/test/org/apache/lucene/index/TestPersistentSnapshotDeletionPolicy.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestPersistentSnapshotDeletionPolicy.java (revision 1478459) +++ lucene/core/src/test/org/apache/lucene/index/TestPersistentSnapshotDeletionPolicy.java (working copy) @@ -22,7 +22,6 @@ import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexWriterConfig.OpenMode; import org.apache.lucene.store.Directory; -import org.apache.lucene.store.LockObtainFailedException; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -32,9 +31,6 @@ // Keep it a class member so that getDeletionPolicy can use it private Directory snapshotDir; - // so we can close it if called by SDP tests - private PersistentSnapshotDeletionPolicy psdp; - @Before @Override public void setUp() throws Exception { @@ -45,19 +41,16 @@ @After @Override public void tearDown() throws Exception { - if (psdp != null) psdp.close(); snapshotDir.close(); super.tearDown(); } @Override protected SnapshotDeletionPolicy getDeletionPolicy() throws IOException { - if (psdp != null) psdp.close(); snapshotDir.close(); snapshotDir = newDirectory(); - return psdp = new PersistentSnapshotDeletionPolicy( - new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.CREATE, - TEST_VERSION_CURRENT); + return new PersistentSnapshotDeletionPolicy( + new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.CREATE); } @Test @@ -68,19 +61,16 @@ PersistentSnapshotDeletionPolicy psdp = (PersistentSnapshotDeletionPolicy) writer.getConfig().getIndexDeletionPolicy(); prepareIndexAndSnapshots(psdp, writer, numSnapshots); writer.close(); - psdp.close(); // Re-initialize and verify snapshots were persisted psdp = new PersistentSnapshotDeletionPolicy( - new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND, - TEST_VERSION_CURRENT); + new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND); IndexWriter iw = new IndexWriter(dir, getConfig(random(), psdp)); psdp = (PersistentSnapshotDeletionPolicy) iw.getConfig().getIndexDeletionPolicy(); iw.close(); assertSnapshotExists(dir, psdp, numSnapshots, false); - psdp.close(); dir.close(); } @@ -92,8 +82,7 @@ writer.close(); try { new PersistentSnapshotDeletionPolicy( - new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND, - TEST_VERSION_CURRENT); + new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND); fail("should not have succeeded to read from an invalid Directory"); } catch (IllegalStateException e) { // expected @@ -102,11 +91,8 @@ @Test public void testNoSnapshotInfos() throws Exception { - // Initialize an empty index in snapshotDir - PSDP should initialize successfully. - new IndexWriter(snapshotDir, getConfig(random(), null)).close(); new PersistentSnapshotDeletionPolicy( - new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND, - TEST_VERSION_CURRENT).close(); + new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.CREATE); } @Test(expected=IllegalStateException.class) @@ -118,8 +104,7 @@ writer.close(); new PersistentSnapshotDeletionPolicy( - new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND, - TEST_VERSION_CURRENT).close(); + new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND); fail("should not have succeeded to open an invalid directory"); } @@ -132,13 +117,10 @@ writer.close(); psdp.release(snapshots.get(0)); - psdp.close(); psdp = new PersistentSnapshotDeletionPolicy( - new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND, - TEST_VERSION_CURRENT); + new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND); assertEquals("Should have no snapshots !", 0, psdp.getSnapshotCount()); - psdp.close(); dir.close(); } @@ -151,39 +133,10 @@ writer.close(); psdp.release(snapshots.get(0).getGeneration()); - psdp.close(); psdp = new PersistentSnapshotDeletionPolicy( - new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND, - TEST_VERSION_CURRENT); + new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND); assertEquals("Should have no snapshots !", 0, psdp.getSnapshotCount()); - psdp.close(); dir.close(); } - - @Test - public void testStaticRead() throws Exception { - // While PSDP is open, it keeps a lock on the snapshots directory and thus - // prevents reading the snapshots information. This test checks that the - // static read method works. - int numSnapshots = 1; - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, getConfig(random(), getDeletionPolicy())); - PersistentSnapshotDeletionPolicy psdp = (PersistentSnapshotDeletionPolicy) writer.getConfig().getIndexDeletionPolicy(); - prepareIndexAndSnapshots(psdp, writer, numSnapshots); - writer.close(); - dir.close(); - - try { - // This should fail, since the snapshots directory is locked - we didn't close it ! - new PersistentSnapshotDeletionPolicy( - new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND, - TEST_VERSION_CURRENT); - fail("should not have reached here - the snapshots directory should be locked!"); - } catch (LockObtainFailedException e) { - // expected - } finally { - psdp.close(); - } - } }