Index: test/org/apache/lucene/index/TestNewIndexModifierDelete.java =================================================================== --- test/org/apache/lucene/index/TestNewIndexModifierDelete.java (revision 0) +++ test/org/apache/lucene/index/TestNewIndexModifierDelete.java (revision 0) @@ -0,0 +1,213 @@ +package org.apache.lucene.index; + +import java.io.IOException; + +import junit.framework.TestCase; + +import org.apache.lucene.analysis.WhitespaceAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.RAMDirectory; + +public class TestNewIndexModifierDelete extends TestCase { + + // test the simple case + public void testSimpleCase() throws IOException { + String[] keywords = { "1", "2" }; + String[] unindexed = { "Netherlands", "Italy" }; + String[] unstored = { "Amsterdam has lots of bridges", + "Venice has lots of canals" }; + String[] text = { "Amsterdam", "Venice" }; + + Directory dir = new RAMDirectory(); + NewIndexModifier modifier = new NewIndexModifier(dir, + new WhitespaceAnalyzer(), true); + modifier.setUseCompoundFile(true); + modifier.setMaxBufferedDeleteTerms(1); + + for (int i = 0; i < keywords.length; i++) { + Document doc = new Document(); + doc.add(new Field("id", keywords[i], Field.Store.YES, + Field.Index.UN_TOKENIZED)); + doc.add(new Field("country", unindexed[i], Field.Store.YES, + Field.Index.NO)); + doc.add(new Field("contents", unstored[i], Field.Store.NO, + Field.Index.TOKENIZED)); + doc + .add(new Field("city", text[i], Field.Store.YES, + Field.Index.TOKENIZED)); + modifier.addDocument(doc); + } + modifier.optimize(); + + Term term = new Term("city", "Amsterdam"); + int hitCount = getHitCount(dir, term); + assertEquals(1, hitCount); + modifier.deleteDocuments(term); + hitCount = getHitCount(dir, term); + assertEquals(0, hitCount); + + modifier.close(); + } + + // test when delete terms only apply to disk segments + public void testNonRAMDelete() throws IOException { + Directory dir = new RAMDirectory(); + NewIndexModifier modifier = new NewIndexModifier(dir, + new WhitespaceAnalyzer(), true); + modifier.setMaxBufferedDocs(2); + modifier.setMaxBufferedDeleteTerms(2); + + int id = 0; + int value = 100; + + for (int i = 0; i < 7; i++) { + addDoc(modifier, ++id, value); + } + modifier.flush(); + + assertEquals(0, modifier.getRAMSegmentCount()); + assertTrue(0 < modifier.getSegmentCount()); + + IndexReader reader = IndexReader.open(dir); + assertEquals(7, reader.numDocs()); + reader.close(); + + modifier.deleteDocuments(new Term("value", String.valueOf(value))); + modifier.deleteDocuments(new Term("value", String.valueOf(value))); + + reader = IndexReader.open(dir); + assertEquals(0, reader.numDocs()); + reader.close(); + + modifier.close(); + } + + // test when delete terms only apply to ram segments + public void testRAMDeletes() throws IOException { + Directory dir = new RAMDirectory(); + NewIndexModifier modifier = new NewIndexModifier(dir, + new WhitespaceAnalyzer(), true); + modifier.setMaxBufferedDocs(4); + modifier.setMaxBufferedDeleteTerms(4); + + int id = 0; + int value = 100; + + addDoc(modifier, ++id, value); + modifier.deleteDocuments(new Term("value", String.valueOf(value))); + addDoc(modifier, ++id, value); + modifier.deleteDocuments(new Term("value", String.valueOf(value))); + + assertEquals(2, modifier.getNumBufferedDeleteTerms()); + assertEquals(1, modifier.getBufferedDeleteTermsSize()); + + addDoc(modifier, ++id, value); + assertEquals(0, modifier.getSegmentCount()); + modifier.flush(); + + IndexReader reader = IndexReader.open(dir); + assertEquals(1, reader.numDocs()); + + int hitCount = getHitCount(dir, new Term("id", String.valueOf(id))); + assertEquals(1, hitCount); + reader.close(); + + modifier.close(); + } + + // test when delete terms apply to both disk and ram segments + public void testBothDeletes() throws IOException { + Directory dir = new RAMDirectory(); + NewIndexModifier modifier = new NewIndexModifier(dir, + new WhitespaceAnalyzer(), true); + modifier.setMaxBufferedDocs(100); + modifier.setMaxBufferedDeleteTerms(100); + + int id = 0; + int value = 100; + + for (int i = 0; i < 5; i++) { + addDoc(modifier, ++id, value); + } + + value = 200; + for (int i = 0; i < 5; i++) { + addDoc(modifier, ++id, value); + } + modifier.flush(); + + for (int i = 0; i < 5; i++) { + addDoc(modifier, ++id, value); + } + modifier.deleteDocuments(new Term("value", String.valueOf(value))); + modifier.flush(); + + IndexReader reader = IndexReader.open(dir); + assertEquals(5, reader.numDocs()); + + modifier.close(); + } + + // test that batched delete terms are flushed together + public void testBatchDeletes() throws IOException { + Directory dir = new RAMDirectory(); + NewIndexModifier modifier = new NewIndexModifier(dir, + new WhitespaceAnalyzer(), true); + modifier.setMaxBufferedDocs(2); + modifier.setMaxBufferedDeleteTerms(2); + + int id = 0; + int value = 100; + + for (int i = 0; i < 7; i++) { + addDoc(modifier, ++id, value); + } + modifier.flush(); + + IndexReader reader = IndexReader.open(dir); + assertEquals(7, reader.numDocs()); + reader.close(); + + id = 0; + modifier.deleteDocuments(new Term("id", String.valueOf(++id))); + modifier.deleteDocuments(new Term("id", String.valueOf(++id))); + + reader = IndexReader.open(dir); + assertEquals(5, reader.numDocs()); + reader.close(); + + Term[] terms = new Term[3]; + for (int i = 0; i < terms.length; i++) { + terms[i] = new Term("id", String.valueOf(++id)); + } + modifier.deleteDocuments(terms); + + reader = IndexReader.open(dir); + assertEquals(2, reader.numDocs()); + reader.close(); + + modifier.close(); + } + + private void addDoc(NewIndexModifier modifier, int id, int value) + throws IOException { + Document doc = new Document(); + doc.add(new Field("content", "aaa", Field.Store.NO, Field.Index.TOKENIZED)); + doc.add(new Field("id", String.valueOf(id), Field.Store.YES, + Field.Index.UN_TOKENIZED)); + doc.add(new Field("value", String.valueOf(value), Field.Store.NO, + Field.Index.UN_TOKENIZED)); + modifier.addDocument(doc); + } + + private int getHitCount(Directory dir, Term term) throws IOException { + IndexSearcher searcher = new IndexSearcher(dir); + int hitCount = searcher.search(new TermQuery(term)).length(); + searcher.close(); + return hitCount; + } +} Index: java/org/apache/lucene/index/NewIndexModifier.java =================================================================== --- java/org/apache/lucene/index/NewIndexModifier.java (revision 0) +++ java/org/apache/lucene/index/NewIndexModifier.java (revision 0) @@ -0,0 +1,231 @@ +package org.apache.lucene.index; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.store.Directory; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Vector; +import java.util.Map.Entry; + +/** + * NewIndexModifier extends {@link IndexWriter} so that you can not only insert + * documents but also delete documents through a single interface. Internally, + * inserts and deletes are buffered before they are flushed to disk. + *
+ * Design Overview + *
+ * deleteDocuments() method works by buffering terms to be deleted. Deletes are + * deferred until ram is flushed to disk, either because enough new documents or + * delete terms are buffered, or because close() is called. Using Java + * synchronization, care is taken to ensure that an interleaved sequence of + * inserts and deletes for the same document are properly serialized. + */ + +public class NewIndexModifier extends IndexWriter { + // number of ram segments a delete term applies to + class Num { + private int num; + Num(int num) { this.num = num; } + int getNum() { return num; } + void setNum(int num) { this.num = num; } + } + + /** + * Default value is 10. Change using {@link #setMaxBufferedDeleteTerms(int)}. + */ + public final static int DEFAULT_MAX_BUFFERED_DELETE_TERMS = 10; + // the max number of delete terms that can be buffered before + // they must be flushed to disk + private int maxBufferedDeleteTerms = DEFAULT_MAX_BUFFERED_DELETE_TERMS; + + // to buffer delete terms in ram before they are applied + // key is delete term, value is number of ram segments the term applies to + private HashMap bufferedDeleteTerms = new HashMap(); + private int numBufferedDeleteTerms = 0; + + /** + * @see IndexWriter#IndexWriter(String, Analyzer, boolean) + */ + public NewIndexModifier(String path, Analyzer a, boolean create) + throws IOException { + super(path, a, create); + } + + /** + * @see IndexWriter#IndexWriter(File, Analyzer, boolean) + */ + public NewIndexModifier(File path, Analyzer a, boolean create) + throws IOException { + super(path, a, create); + } + + /** + * @see IndexWriter#IndexWriter(Directory, Analyzer, boolean) + */ + public NewIndexModifier(Directory d, Analyzer a, boolean create) + throws IOException { + super(d, a, create); + } + + /** Determines the minimal number of delete terms required before the buffered + * in-memory delete terms are applied and flushed. If there are documents + * buffered in memory at the time, they are merged and a new Segment is + * created. The delete terms are applied appropriately. + * + *
The default value is 10.
+ *
+ * @throws IllegalArgumentException if maxBufferedDeleteTerms is smaller than 1
+ */
+ public void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
+ if (maxBufferedDeleteTerms < 1)
+ throw new IllegalArgumentException(
+ "maxBufferedDeleteTerms must at least be 1");
+ this.maxBufferedDeleteTerms = maxBufferedDeleteTerms;
+ }
+
+ /**
+ * @see #setMaxBufferedDeleteTerms
+ */
+ public int getMaxBufferedDeleteTerms() {
+ return maxBufferedDeleteTerms;
+ }
+
+ // for test purpose
+ final synchronized int getBufferedDeleteTermsSize() {
+ return bufferedDeleteTerms.size();
+ }
+
+ // for test purpose
+ final synchronized int getNumBufferedDeleteTerms() {
+ return numBufferedDeleteTerms;
+ }
+
+ /**
+ * Force all changes to disk.
+ * @throws IOException
+ */
+ public void flush() throws IOException {
+ flushRamSegments();
+ }
+
+ /**
+ * Deletes all documents containing term.
+ */
+ public synchronized void deleteDocuments(Term term) throws IOException {
+ bufferDeleteTerm(term);
+ maybeFlushRamSegments();
+ }
+
+ /**
+ * Deletes all documents containing any of the terms. All deletes are flushed
+ * at the same time.
+ */
+ public synchronized void deleteDocuments(Term[] terms) throws IOException {
+ for (int i = 0; i < terms.length; i++) {
+ bufferDeleteTerm(terms[i]);
+ }
+ maybeFlushRamSegments();
+ }
+
+ // buffer a term in bufferedDeleteTerms. bufferedDeleteTerms also records
+ // the current number of documents buffered in ram so that the delete term
+ // will be applied to those ram segments as well as the disk segments
+ private void bufferDeleteTerm(Term term) {
+ Num num = (Num)bufferedDeleteTerms.get(term);
+ if (num == null) {
+ bufferedDeleteTerms.put(term, new Num(getRAMSegmentCount()));
+ } else {
+ num.setNum(getRAMSegmentCount());
+ }
+ numBufferedDeleteTerms++;
+ }
+
+ // a flush is triggered if enough new documents are buffered or
+ // if enough delete terms are buffered
+ protected boolean timeToFlushRam() {
+ return super.timeToFlushRam()
+ || numBufferedDeleteTerms >= maxBufferedDeleteTerms;
+ }
+
+ protected boolean anythingToFlushRam() {
+ return super.anythingToFlushRam() || bufferedDeleteTerms.size() > 0;
+ }
+
+ protected void doAfterFlushRamSegments(SegmentInfos segmentInfos,
+ boolean flushedRamSegments,
+ Vector readersToClose) throws IOException {
+ if (bufferedDeleteTerms.size() > 0) {
+ if (flushedRamSegments) {
+ IndexReader reader =
+ SegmentReader.get(segmentInfos.info(segmentInfos.size() - 1));
+ // apply delete terms to the segment just flushed from ram
+ // apply appropriately so that a delete term is only applied to
+ // the documents buffered before it, not those buffered after it
+ applyDeletesSelectively(bufferedDeleteTerms, reader);
+ reader.close();
+ }
+
+ int readersStart = readersToClose.size();
+ int infosEnd = segmentInfos.size();
+ if (flushedRamSegments) { infosEnd--; }
+
+ for (int i = 0; i < infosEnd; i++) {
+ IndexReader reader = SegmentReader.get(segmentInfos.info(i));
+ readersToClose.addElement(reader);
+ }
+ // apply delete terms to disk segments except the one just flushed from ram
+ applyDeletes(bufferedDeleteTerms, readersToClose, readersStart);
+
+ // clean up bufferedDeleteTerms
+ bufferedDeleteTerms.clear();
+ numBufferedDeleteTerms = 0;
+ }
+ }
+
+ // apply buffered delete terms to the segment just flushed from ram
+ // apply appropriately so that a delete term is only applied to
+ // the documents buffered before it, not those buffered after it
+ private final void applyDeletesSelectively(HashMap deleteTerms,
+ IndexReader reader) throws IOException {
+ Iterator iter = deleteTerms.entrySet().iterator();
+
+ while (iter.hasNext()) {
+ Entry entry = (Entry)iter.next();
+ Term term = (Term)entry.getKey();
+
+ TermDocs docs = reader.termDocs(term);
+ if (docs != null) {
+ int num = ((Num)entry.getValue()).getNum();
+ try {
+ while (docs.next()) {
+ int doc = docs.doc();
+ if (doc >= num) {
+ break;
+ }
+ reader.deleteDocument(doc);
+ }
+ } finally {
+ docs.close();
+ }
+ }
+ }
+ }
+
+ // apply buffered delete terms to disk segments except the one just flushed from ram
+ private final void applyDeletes(HashMap deleteTerms, Vector readers, int start)
+ throws IOException {
+ for (int i = start; i < readers.size(); i++) {
+ IndexReader reader = (IndexReader)readers.elementAt(i);
+ Iterator iter = deleteTerms.entrySet().iterator();
+
+ while (iter.hasNext()) {
+ Entry entry = (Entry)iter.next();
+ Term term = (Term)entry.getKey();
+ reader.deleteDocuments(term);
+ }
+ }
+ }
+}
Index: java/org/apache/lucene/index/IndexWriter.java
===================================================================
--- java/org/apache/lucene/index/IndexWriter.java (revision 448601)
+++ java/org/apache/lucene/index/IndexWriter.java (working copy)
@@ -725,15 +725,28 @@
// counts x and y, then f(x) >= f(y).
// 2: The number of committed segments on the same level (f(n)) <= M.
- private final void maybeFlushRamSegments() throws IOException {
- if (ramSegmentInfos.size() >= minMergeDocs) {
+ protected boolean timeToFlushRam() {
+ return ramSegmentInfos.size() >= minMergeDocs;
+ }
+
+ protected boolean anythingToFlushRam() {
+ return ramSegmentInfos.size() > 0;
+ }
+
+ protected void doAfterFlushRamSegments(SegmentInfos segmentInfos,
+ boolean flushedRamSegments,
+ Vector readersToClose) throws IOException {
+ }
+
+ protected final void maybeFlushRamSegments() throws IOException {
+ if (timeToFlushRam()) {
flushRamSegments();
}
}
/** Merges all RAM-resident segments, then may merge segments. */
- private final void flushRamSegments() throws IOException {
- if (ramSegmentInfos.size() > 0) {
+ protected final void flushRamSegments() throws IOException {
+ if (anythingToFlushRam()) {
mergeSegments(ramSegmentInfos, 0, ramSegmentInfos.size());
maybeMergeSegments();
}
@@ -806,10 +819,15 @@
private final int mergeSegments(SegmentInfos sourceSegments, int minSegment, int end)
throws IOException {
final String mergedName = newSegmentName();
+ final Vector segmentsToDelete = new Vector();
+ final Vector readersToClose = new Vector();
+ SegmentMerger merger = null;
+ int mergedDocCount = 0;
+
+ if (end > 0) {
if (infoStream != null) infoStream.print("merging segments");
- SegmentMerger merger = new SegmentMerger(this, mergedName);
+ merger = new SegmentMerger(this, mergedName);
- final Vector segmentsToDelete = new Vector();
for (int i = minSegment; i < end; i++) {
SegmentInfo si = sourceSegments.info(i);
if (infoStream != null)
@@ -821,7 +839,7 @@
segmentsToDelete.addElement(reader); // queue segment for deletion
}
- int mergedDocCount = merger.merge();
+ mergedDocCount = merger.merge();
if (infoStream != null) {
infoStream.println(" into "+mergedName+" ("+mergedDocCount+" docs)");
@@ -840,10 +858,19 @@
// close readers before we attempt to delete now-obsolete segments
merger.closeReaders();
+ }
+ if (sourceSegments == ramSegmentInfos) {
+ doAfterFlushRamSegments(segmentInfos, end > 0, readersToClose);
+ }
+
synchronized (directory) { // in- & inter-process sync
new Lock.With(directory.makeLock(COMMIT_LOCK_NAME), commitLockTimeout) {
public Object doBody() throws IOException {
+ // commit delete files
+ for (int i = 0; i < readersToClose.size(); i++) {
+ ((IndexReader)readersToClose.elementAt(i)).close();
+ }
segmentInfos.write(directory); // commit before deleting
return null;
}
@@ -852,7 +879,7 @@
deleteSegments(segmentsToDelete); // delete now-unused segments
- if (useCompoundFile) {
+ if (useCompoundFile && end > 0) {
final Vector filesToDelete = merger.createCompoundFile(mergedName + ".tmp");
synchronized (directory) { // in- & inter-process sync
new Lock.With(directory.makeLock(COMMIT_LOCK_NAME), commitLockTimeout) {