Index: lucene/core/src/java/org/apache/lucene/index/PersistentSnapshotDeletionPolicy.java
===================================================================
--- lucene/core/src/java/org/apache/lucene/index/PersistentSnapshotDeletionPolicy.java (revision 1478459)
+++ lucene/core/src/java/org/apache/lucene/index/PersistentSnapshotDeletionPolicy.java (working copy)
@@ -17,14 +17,20 @@
* the License.
*/
-import java.io.Closeable;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map.Entry;
+import java.util.Map;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.StoredField;
+import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.Version;
/**
@@ -33,66 +39,31 @@
* are persisted in a {@link Directory} and are committed as soon as
* {@link #snapshot()} or {@link #release(IndexCommit)} is called.
*
- * NOTE: this class receives a {@link Directory} to persist the data into
- * a Lucene index. It is highly recommended to use a dedicated directory (and on
- * stable storage as well) for persisting the snapshots' information, and not
- * reuse the content index directory, or otherwise conflicts and index
- * corruption will occur.
- *
- * NOTE: you should call {@link #close()} when you're done using this
- * class for safety (it will close the {@link IndexWriter} instance used).
- *
* NOTE: Sharing {@link PersistentSnapshotDeletionPolicy}s that write to
* the same directory across {@link IndexWriter}s will corrupt snapshots. You
* should make sure every {@link IndexWriter} has its own
* {@link PersistentSnapshotDeletionPolicy} and that they all write to a
- * different {@link Directory}.
+ * different {@link Directory}. It is OK to use the same
+ * Directory that holds the index.
*
*
This class adds a {@link #release(long)} method to
* release commits from a previous snapshot's {@link IndexCommit#getGeneration}.
*
* @lucene.experimental
*/
-public class PersistentSnapshotDeletionPolicy extends SnapshotDeletionPolicy implements Closeable {
+public class PersistentSnapshotDeletionPolicy extends SnapshotDeletionPolicy {
- // Used to validate that the given directory includes just one document w/ the
- // given gen field. Otherwise, it's not a valid Directory for snapshotting.
- private static final String SNAPSHOTS_GENS = "$SNAPSHOTS_DOC$";
+ private static final String SNAPSHOTS_PREFIX = "snapshots_";
+ private static final int VERSION_START = 0;
+ private static final int VERSION_CURRENT = VERSION_START;
+ private static final String CODEC_NAME = "snapshots";
// The index writer which maintains the snapshots metadata
- private final IndexWriter writer;
+ private long nextWriteGen;
+ private final Directory dir;
+
/**
- * Reads the snapshots information from the given {@link Directory}. This
- * method can be used if the snapshots information is needed, however you
- * cannot instantiate the deletion policy (because e.g., some other process
- * keeps a lock on the snapshots directory).
- */
- private void loadPriorSnapshots(Directory dir) throws IOException {
- IndexReader r = DirectoryReader.open(dir);
- try {
- int numDocs = r.numDocs();
- // index is allowed to have exactly one document or 0.
- if (numDocs == 1) {
- StoredDocument doc = r.document(r.maxDoc() - 1);
- if (doc.getField(SNAPSHOTS_GENS) == null) {
- throw new IllegalStateException("directory is not a valid snapshots store!");
- }
- for (StorableField f : doc) {
- if (!f.name().equals(SNAPSHOTS_GENS)) {
- refCounts.put(Long.parseLong(f.name()), Integer.parseInt(f.stringValue()));
- }
- }
- } else if (numDocs != 0) {
- throw new IllegalStateException(
- "should be at most 1 document in the snapshots directory: " + numDocs);
- }
- } finally {
- r.close();
- }
- }
-
- /**
* {@link PersistentSnapshotDeletionPolicy} wraps another
* {@link IndexDeletionPolicy} to enable flexible snapshotting.
*
@@ -112,29 +83,19 @@
* IndexWriter.
*/
public PersistentSnapshotDeletionPolicy(IndexDeletionPolicy primary,
- Directory dir, OpenMode mode, Version matchVersion) throws IOException {
+ Directory dir, OpenMode mode) throws IOException {
super(primary);
- // Initialize the index writer over the snapshot directory.
- writer = new IndexWriter(dir, new IndexWriterConfig(matchVersion, null).setOpenMode(mode));
- if (mode != OpenMode.APPEND) {
- // IndexWriter no longer creates a first commit on an empty Directory. So
- // if we were asked to CREATE*, call commit() just to be sure. If the
- // index contains information and mode is CREATE_OR_APPEND, it's a no-op.
- writer.commit();
+ this.dir = dir;
+
+ if (mode == OpenMode.CREATE) {
+ clearPriorSnapshots();
}
- try {
- // Initializes the snapshots information. This code should basically run
- // only if mode != CREATE, but if it is, it's no harm as we only open the
- // reader once and immediately close it.
- loadPriorSnapshots(dir);
- } catch (RuntimeException e) {
- writer.close(); // don't leave any open file handles
- throw e;
- } catch (IOException e) {
- writer.close(); // don't leave any open file handles
- throw e;
+ loadPriorSnapshots();
+
+ if (mode == OpenMode.APPEND && nextWriteGen == 0) {
+ throw new IllegalStateException("no snapshots stored in this directory");
}
}
@@ -147,7 +108,19 @@
@Override
public synchronized IndexCommit snapshot() throws IOException {
IndexCommit ic = super.snapshot();
- persist();
+ boolean success = false;
+ try {
+ persist();
+ success = true;
+ } finally {
+ if (!success) {
+ try {
+ super.release(ic);
+ } catch (Exception e) {
+ // Suppress so we keep throwing original exception
+ }
+ }
+ }
return ic;
}
@@ -160,7 +133,19 @@
@Override
public synchronized void release(IndexCommit commit) throws IOException {
super.release(commit);
- persist();
+ boolean success = false;
+ try {
+ persist();
+ success = true;
+ } finally {
+ if (!success) {
+ try {
+ incRef(commit);
+ } catch (Exception e) {
+ // Suppress so we keep throwing original exception
+ }
+ }
+ }
}
/**
@@ -175,23 +160,100 @@
persist();
}
- /** Closes the index which writes the snapshots to the directory. */
- public void close() throws IOException {
- writer.close();
+ synchronized private void persist() throws IOException {
+ String fileName = SNAPSHOTS_PREFIX + nextWriteGen;
+ IndexOutput out = dir.createOutput(fileName, IOContext.DEFAULT);
+ boolean success = false;
+ try {
+ CodecUtil.writeHeader(out, CODEC_NAME, VERSION_CURRENT);
+ out.writeVInt(refCounts.size());
+ for(Entry ent : refCounts.entrySet()) {
+ out.writeVLong(ent.getKey());
+ out.writeVInt(ent.getValue());
+ }
+ success = true;
+ } finally {
+ if (!success) {
+ IOUtils.closeWhileHandlingException(out);
+ try {
+ dir.deleteFile(fileName);
+ } catch (Exception e) {
+ // Suppress so we keep throwing original exception
+ }
+ } else {
+ IOUtils.close(out);
+ }
+ }
+
+ nextWriteGen++;
}
+ private synchronized void clearPriorSnapshots() throws IOException {
+ for(String file : dir.listAll()) {
+ if (file.startsWith(SNAPSHOTS_PREFIX)) {
+ dir.deleteFile(file);
+ }
+ }
+ }
+
/**
- * Persists all snapshots information.
+ * Reads the snapshots information from the given {@link Directory}. This
+ * method can be used if the snapshots information is needed, however you
+ * cannot instantiate the deletion policy (because e.g., some other process
+ * keeps a lock on the snapshots directory).
*/
- private void persist() throws IOException {
- writer.deleteAll();
- Document d = new Document();
- d.add(new StoredField(SNAPSHOTS_GENS, ""));
- for (Entry e : refCounts.entrySet()) {
- d.add(new StoredField(e.getKey().toString(), e.getValue().toString()));
+ private synchronized void loadPriorSnapshots() throws IOException {
+ long genLoaded = -1;
+ IOException ioe = null;
+ List snapshotFiles = new ArrayList();
+ for(String file : dir.listAll()) {
+ if (file.startsWith(SNAPSHOTS_PREFIX)) {
+ long gen = Long.parseLong(file.substring(SNAPSHOTS_PREFIX.length()));
+ if (genLoaded == -1 || gen > genLoaded) {
+ snapshotFiles.add(file);
+ Map m = new HashMap();
+ IndexInput in = dir.openInput(file, IOContext.DEFAULT);
+ try {
+ CodecUtil.checkHeader(in, CODEC_NAME, VERSION_START, VERSION_START);
+ int count = in.readVInt();
+ for(int i=0;i 1) {
+ // Remove any broken / old snapshot files:
+ String curFileName = SNAPSHOTS_PREFIX + genLoaded;
+ for(String file : snapshotFiles) {
+ if (!curFileName.equals(file)) {
+ dir.deleteFile(file);
+ }
+ }
+ }
+ nextWriteGen = 1+genLoaded;
+ }
}
-
}
Index: lucene/core/src/java/org/apache/lucene/index/SnapshotDeletionPolicy.java
===================================================================
--- lucene/core/src/java/org/apache/lucene/index/SnapshotDeletionPolicy.java (revision 1478459)
+++ lucene/core/src/java/org/apache/lucene/index/SnapshotDeletionPolicy.java (working copy)
@@ -111,6 +111,19 @@
}
}
+ protected synchronized void incRef(IndexCommit ic) {
+ long gen = ic.getGeneration();
+ Integer refCount = refCounts.get(gen);
+ int refCountInt;
+ if (refCount == null) {
+ indexCommits.put(gen, lastCommit);
+ refCountInt = 0;
+ } else {
+ refCountInt = refCount.intValue();
+ }
+ refCounts.put(gen, refCountInt+1);
+ }
+
/**
* Snapshots the last commit and returns it. Once a commit is 'snapshotted,' it is protected
* from deletion (as long as this {@link IndexDeletionPolicy} is used). The
@@ -134,19 +147,8 @@
throw new IllegalStateException("No index commit to snapshot");
}
- long gen = lastCommit.getGeneration();
+ incRef(lastCommit);
- Integer refCount = refCounts.get(gen);
- int refCountInt;
- if (refCount == null) {
- indexCommits.put(gen, lastCommit);
- refCountInt = 0;
- } else {
- refCountInt = refCount.intValue();
- }
-
- refCounts.put(gen, refCountInt+1);
-
return lastCommit;
}
Index: lucene/core/src/test/org/apache/lucene/index/TestPersistentSnapshotDeletionPolicy.java
===================================================================
--- lucene/core/src/test/org/apache/lucene/index/TestPersistentSnapshotDeletionPolicy.java (revision 1478459)
+++ lucene/core/src/test/org/apache/lucene/index/TestPersistentSnapshotDeletionPolicy.java (working copy)
@@ -22,7 +22,6 @@
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.LockObtainFailedException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -32,9 +31,6 @@
// Keep it a class member so that getDeletionPolicy can use it
private Directory snapshotDir;
- // so we can close it if called by SDP tests
- private PersistentSnapshotDeletionPolicy psdp;
-
@Before
@Override
public void setUp() throws Exception {
@@ -45,19 +41,16 @@
@After
@Override
public void tearDown() throws Exception {
- if (psdp != null) psdp.close();
snapshotDir.close();
super.tearDown();
}
@Override
protected SnapshotDeletionPolicy getDeletionPolicy() throws IOException {
- if (psdp != null) psdp.close();
snapshotDir.close();
snapshotDir = newDirectory();
- return psdp = new PersistentSnapshotDeletionPolicy(
- new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.CREATE,
- TEST_VERSION_CURRENT);
+ return new PersistentSnapshotDeletionPolicy(
+ new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.CREATE);
}
@Test
@@ -68,19 +61,16 @@
PersistentSnapshotDeletionPolicy psdp = (PersistentSnapshotDeletionPolicy) writer.getConfig().getIndexDeletionPolicy();
prepareIndexAndSnapshots(psdp, writer, numSnapshots);
writer.close();
- psdp.close();
// Re-initialize and verify snapshots were persisted
psdp = new PersistentSnapshotDeletionPolicy(
- new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND,
- TEST_VERSION_CURRENT);
+ new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND);
IndexWriter iw = new IndexWriter(dir, getConfig(random(), psdp));
psdp = (PersistentSnapshotDeletionPolicy) iw.getConfig().getIndexDeletionPolicy();
iw.close();
assertSnapshotExists(dir, psdp, numSnapshots, false);
- psdp.close();
dir.close();
}
@@ -92,8 +82,7 @@
writer.close();
try {
new PersistentSnapshotDeletionPolicy(
- new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND,
- TEST_VERSION_CURRENT);
+ new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND);
fail("should not have succeeded to read from an invalid Directory");
} catch (IllegalStateException e) {
// expected
@@ -102,11 +91,8 @@
@Test
public void testNoSnapshotInfos() throws Exception {
- // Initialize an empty index in snapshotDir - PSDP should initialize successfully.
- new IndexWriter(snapshotDir, getConfig(random(), null)).close();
new PersistentSnapshotDeletionPolicy(
- new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND,
- TEST_VERSION_CURRENT).close();
+ new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.CREATE);
}
@Test(expected=IllegalStateException.class)
@@ -118,8 +104,7 @@
writer.close();
new PersistentSnapshotDeletionPolicy(
- new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND,
- TEST_VERSION_CURRENT).close();
+ new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND);
fail("should not have succeeded to open an invalid directory");
}
@@ -132,13 +117,10 @@
writer.close();
psdp.release(snapshots.get(0));
- psdp.close();
psdp = new PersistentSnapshotDeletionPolicy(
- new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND,
- TEST_VERSION_CURRENT);
+ new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND);
assertEquals("Should have no snapshots !", 0, psdp.getSnapshotCount());
- psdp.close();
dir.close();
}
@@ -151,39 +133,10 @@
writer.close();
psdp.release(snapshots.get(0).getGeneration());
- psdp.close();
psdp = new PersistentSnapshotDeletionPolicy(
- new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND,
- TEST_VERSION_CURRENT);
+ new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND);
assertEquals("Should have no snapshots !", 0, psdp.getSnapshotCount());
- psdp.close();
dir.close();
}
-
- @Test
- public void testStaticRead() throws Exception {
- // While PSDP is open, it keeps a lock on the snapshots directory and thus
- // prevents reading the snapshots information. This test checks that the
- // static read method works.
- int numSnapshots = 1;
- Directory dir = newDirectory();
- IndexWriter writer = new IndexWriter(dir, getConfig(random(), getDeletionPolicy()));
- PersistentSnapshotDeletionPolicy psdp = (PersistentSnapshotDeletionPolicy) writer.getConfig().getIndexDeletionPolicy();
- prepareIndexAndSnapshots(psdp, writer, numSnapshots);
- writer.close();
- dir.close();
-
- try {
- // This should fail, since the snapshots directory is locked - we didn't close it !
- new PersistentSnapshotDeletionPolicy(
- new KeepOnlyLastCommitDeletionPolicy(), snapshotDir, OpenMode.APPEND,
- TEST_VERSION_CURRENT);
- fail("should not have reached here - the snapshots directory should be locked!");
- } catch (LockObtainFailedException e) {
- // expected
- } finally {
- psdp.close();
- }
- }
}