Index: CHANGES.txt =================================================================== --- CHANGES.txt (revision 631207) +++ CHANGES.txt (working copy) @@ -101,6 +101,10 @@ huge index might not stop within the specified time. (Sean Timm via Doron Cohen) + 8. LUCENE-1194: Added IndexWriter.deleteDocuments(Query) to delete + documents matching the specified query. (Mike McCandless) + + Optimizations 1. LUCENE-705: When building a compound file, use Index: src/test/org/apache/lucene/index/TestIndexWriterDelete.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriterDelete.java (revision 631207) +++ src/test/org/apache/lucene/index/TestIndexWriterDelete.java (working copy) @@ -30,6 +30,7 @@ import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MockRAMDirectory; +import org.apache.lucene.search.TermQuery; public class TestIndexWriterDelete extends LuceneTestCase { @@ -108,7 +109,6 @@ reader.close(); modifier.deleteDocuments(new Term("value", String.valueOf(value))); - modifier.deleteDocuments(new Term("value", String.valueOf(value))); modifier.commit(); @@ -120,41 +120,62 @@ } } + public void testMaxBufferedDeletes() throws IOException { + Directory dir = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir, false, + new WhitespaceAnalyzer(), true, IndexWriter.MaxFieldLength.LIMITED); + writer.setMaxBufferedDeleteTerms(1); + writer.deleteDocuments(new Term("foobar", "1")); + writer.deleteDocuments(new Term("foobar", "1")); + writer.deleteDocuments(new Term("foobar", "1")); + assertEquals(3, writer.getFlushDeletesCount()); + writer.close(); + dir.close(); + } + // test when delete terms only apply to ram segments public void testRAMDeletes() throws IOException { for(int pass=0;pass<2;pass++) { - boolean autoCommit = (0==pass); - Directory dir = new MockRAMDirectory(); - IndexWriter modifier = new IndexWriter(dir, autoCommit, - new WhitespaceAnalyzer(), true, IndexWriter.MaxFieldLength.LIMITED); - modifier.setMaxBufferedDocs(4); - modifier.setMaxBufferedDeleteTerms(4); + for(int t=0;t<2;t++) { + boolean autoCommit = (0==pass); + Directory dir = new MockRAMDirectory(); + IndexWriter modifier = new IndexWriter(dir, autoCommit, + new WhitespaceAnalyzer(), true, IndexWriter.MaxFieldLength.LIMITED); + modifier.setMaxBufferedDocs(4); + modifier.setMaxBufferedDeleteTerms(4); - int id = 0; - int value = 100; + int id = 0; + int value = 100; - addDoc(modifier, ++id, value); - modifier.deleteDocuments(new Term("value", String.valueOf(value))); - addDoc(modifier, ++id, value); - modifier.deleteDocuments(new Term("value", String.valueOf(value))); + addDoc(modifier, ++id, value); + if (0 == t) + modifier.deleteDocuments(new Term("value", String.valueOf(value))); + else + modifier.deleteDocuments(new TermQuery(new Term("value", String.valueOf(value)))); + addDoc(modifier, ++id, value); + if (0 == t) { + modifier.deleteDocuments(new Term("value", String.valueOf(value))); + assertEquals(2, modifier.getNumBufferedDeleteTerms()); + assertEquals(1, modifier.getBufferedDeleteTermsSize()); + } + else + modifier.deleteDocuments(new TermQuery(new Term("value", String.valueOf(value)))); - assertEquals(2, modifier.getNumBufferedDeleteTerms()); - assertEquals(1, modifier.getBufferedDeleteTermsSize()); + addDoc(modifier, ++id, value); + assertEquals(0, modifier.getSegmentCount()); + modifier.flush(); - addDoc(modifier, ++id, value); - assertEquals(0, modifier.getSegmentCount()); - modifier.flush(); + modifier.commit(); - modifier.commit(); + IndexReader reader = IndexReader.open(dir); + assertEquals(1, reader.numDocs()); - IndexReader reader = IndexReader.open(dir); - assertEquals(1, reader.numDocs()); - - int hitCount = getHitCount(dir, new Term("id", String.valueOf(id))); - assertEquals(1, hitCount); - reader.close(); - modifier.close(); - dir.close(); + int hitCount = getHitCount(dir, new Term("id", String.valueOf(id))); + assertEquals(1, hitCount); + reader.close(); + modifier.close(); + dir.close(); + } } } Index: src/test/org/apache/lucene/index/TestIndexWriter.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriter.java (revision 631207) +++ src/test/org/apache/lucene/index/TestIndexWriter.java (working copy) @@ -1353,7 +1353,7 @@ assertTrue(flushCount > lastFlushCount); lastFlushCount = flushCount; writer.setRAMBufferSizeMB(0.000001); - writer.setMaxBufferedDeleteTerms(IndexWriter.DISABLE_AUTO_FLUSH); + writer.setMaxBufferedDeleteTerms(1); } else if (j < 20) { assertTrue(flushCount > lastFlushCount); lastFlushCount = flushCount; @@ -1366,6 +1366,7 @@ } else if (30 == j) { writer.setRAMBufferSizeMB(0.000001); writer.setMaxBufferedDeleteTerms(IndexWriter.DISABLE_AUTO_FLUSH); + writer.setMaxBufferedDeleteTerms(1); } else if (j < 40) { assertTrue(flushCount> lastFlushCount); lastFlushCount = flushCount; @@ -1554,7 +1555,7 @@ doc.add(new Field("field", "aaa", Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS)); for(int i=0;i<19;i++) writer.addDocument(doc); - writer.flush(false, true); + writer.flush(false, true, true); writer.close(); SegmentInfos sis = new SegmentInfos(); sis.read(dir); Index: src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java =================================================================== --- src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java (revision 631207) +++ src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java (working copy) @@ -29,6 +29,9 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.RAMDirectory; +import org.apache.lucene.search.PhraseQuery; +import org.apache.lucene.index.Term; + public class TestAddIndexesNoOptimize extends LuceneTestCase { public void testSimpleCase() throws IOException { // main directory @@ -122,6 +125,118 @@ verifyTermDocs(dir, new Term("content", "bbb"), 51); } + public void testWithPendingDeletes() throws IOException { + // main directory + Directory dir = new RAMDirectory(); + // auxiliary directory + Directory aux = new RAMDirectory(); + + setUpDirs(dir, aux); + IndexWriter writer = newWriter(dir, false); + writer.addIndexesNoOptimize(new Directory[] {aux}); + + // Adds 10 docs, then replaces them with another 10 + // docs, so 10 pending deletes: + for (int i = 0; i < 20; i++) { + Document doc = new Document(); + doc.add(new Field("id", "" + (i % 10), Field.Store.NO, Field.Index.UN_TOKENIZED)); + doc.add(new Field("content", "bbb " + i, Field.Store.NO, + Field.Index.TOKENIZED)); + writer.updateDocument(new Term("id", "" + (i%10)), doc); + } + // Deletes one of the 10 added docs, leaving 9: + PhraseQuery q = new PhraseQuery(); + q.add(new Term("content", "bbb")); + q.add(new Term("content", "14")); + writer.deleteDocuments(q); + + writer.optimize(); + + verifyNumDocs(dir, 1039); + verifyTermDocs(dir, new Term("content", "aaa"), 1030); + verifyTermDocs(dir, new Term("content", "bbb"), 9); + + writer.close(); + dir.close(); + aux.close(); + } + + public void testWithPendingDeletes2() throws IOException { + // main directory + Directory dir = new RAMDirectory(); + // auxiliary directory + Directory aux = new RAMDirectory(); + + setUpDirs(dir, aux); + IndexWriter writer = newWriter(dir, false); + + // Adds 10 docs, then replaces them with another 10 + // docs, so 10 pending deletes: + for (int i = 0; i < 20; i++) { + Document doc = new Document(); + doc.add(new Field("id", "" + (i % 10), Field.Store.NO, Field.Index.UN_TOKENIZED)); + doc.add(new Field("content", "bbb " + i, Field.Store.NO, + Field.Index.TOKENIZED)); + writer.updateDocument(new Term("id", "" + (i%10)), doc); + } + + writer.addIndexesNoOptimize(new Directory[] {aux}); + + // Deletes one of the 10 added docs, leaving 9: + PhraseQuery q = new PhraseQuery(); + q.add(new Term("content", "bbb")); + q.add(new Term("content", "14")); + writer.deleteDocuments(q); + + writer.optimize(); + + verifyNumDocs(dir, 1039); + verifyTermDocs(dir, new Term("content", "aaa"), 1030); + verifyTermDocs(dir, new Term("content", "bbb"), 9); + + writer.close(); + dir.close(); + aux.close(); + } + + public void testWithPendingDeletes3() throws IOException { + // main directory + Directory dir = new RAMDirectory(); + // auxiliary directory + Directory aux = new RAMDirectory(); + + setUpDirs(dir, aux); + IndexWriter writer = newWriter(dir, false); + + // Adds 10 docs, then replaces them with another 10 + // docs, so 10 pending deletes: + for (int i = 0; i < 20; i++) { + Document doc = new Document(); + doc.add(new Field("id", "" + (i % 10), Field.Store.NO, Field.Index.UN_TOKENIZED)); + doc.add(new Field("content", "bbb " + i, Field.Store.NO, + Field.Index.TOKENIZED)); + writer.updateDocument(new Term("id", "" + (i%10)), doc); + } + + // Deletes one of the 10 added docs, leaving 9: + PhraseQuery q = new PhraseQuery(); + q.add(new Term("content", "bbb")); + q.add(new Term("content", "14")); + writer.deleteDocuments(q); + + writer.addIndexesNoOptimize(new Directory[] {aux}); + + writer.optimize(); + + verifyNumDocs(dir, 1039); + verifyTermDocs(dir, new Term("content", "aaa"), 1030); + verifyTermDocs(dir, new Term("content", "bbb"), 9); + + writer.close(); + dir.close(); + aux.close(); + } + // case 0: add self or exceed maxMergeDocs, expect exception public void testAddSelf() throws IOException { // main directory Index: src/java/org/apache/lucene/index/SegmentMerger.java =================================================================== --- src/java/org/apache/lucene/index/SegmentMerger.java (revision 631207) +++ src/java/org/apache/lucene/index/SegmentMerger.java (working copy) @@ -436,10 +436,21 @@ private final void mergeTermInfos() throws CorruptIndexException, IOException { int base = 0; - for (int i = 0; i < readers.size(); i++) { + final int readerCount = readers.size(); + for (int i = 0; i < readerCount; i++) { IndexReader reader = (IndexReader) readers.elementAt(i); TermEnum termEnum = reader.terms(); SegmentMergeInfo smi = new SegmentMergeInfo(base, termEnum, reader); + int[] docMap = smi.getDocMap(); + if (docMap != null) { + if (docMaps == null) { + docMaps = new int[readerCount][]; + delCounts = new int[readerCount]; + } + docMaps[i] = docMap; + delCounts[i] = smi.reader.maxDoc() - smi.reader.numDocs(); + } + base += reader.numDocs(); if (smi.next()) queue.put(smi); // initialize queue @@ -504,7 +515,15 @@ return df; } - private byte[] payloadBuffer = null; + private byte[] payloadBuffer; + private int[][] docMaps; + int[][] getDocMaps() { + return docMaps; + } + private int[] delCounts; + int[] getDelCounts() { + return delCounts; + } /** Process postings from multiple segments all positioned on the * same term. Writes out merged entries into freqOutput and Index: src/java/org/apache/lucene/index/MergeDocIDRemapper.java =================================================================== --- src/java/org/apache/lucene/index/MergeDocIDRemapper.java (revision 0) +++ src/java/org/apache/lucene/index/MergeDocIDRemapper.java (revision 0) @@ -0,0 +1,110 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Remaps docIDs after a merge has completed, where the + * merged segments had at least one deletion. This is used + * to renumber the buffered deletes in IndexWriter when a + * merge of segments with deletions commits. */ + +final class MergeDocIDRemapper { + int[] starts; // used for binary search of mapped docID + int[] newStarts; // starts, minus the deletes + int[][] docMaps; // maps docIDs in the merged set + int minDocID; // minimum docID that needs renumbering + int maxDocID; // 1+ the max docID that needs renumbering + int docShift; // total # deleted docs that were compacted by this merge + + public MergeDocIDRemapper(SegmentInfos infos, int[][] docMaps, int[] delCounts, MergePolicy.OneMerge merge, int mergedDocCount) { + this.docMaps = docMaps; + SegmentInfo firstSegment = merge.segments.info(0); + int i = 0; + while(true) { + SegmentInfo info = infos.info(i); + if (info.equals(firstSegment)) + break; + minDocID += info.docCount; + i++; + } + + int numDocs = 0; + for(int j=0;j 0; + + // Make sure it all adds up: + assert docShift == maxDocID - (newStarts[docMaps.length-1] + merge.segments.info(docMaps.length-1).docCount - delCounts[docMaps.length-1]); + } + + public int remap(int oldDocID) { + if (oldDocID < minDocID) + // Unaffected by merge + return oldDocID; + else if (oldDocID >= maxDocID) + // This doc was "after" the merge, so simple shift + return oldDocID - docShift; + else { + // Binary search to locate this document & find its new docID + int lo = 0; // search starts array + int hi = docMaps.length - 1; // for first element less + + while (hi >= lo) { + int mid = (lo + hi) >> 1; + int midValue = starts[mid]; + if (oldDocID < midValue) + hi = mid - 1; + else if (oldDocID > midValue) + lo = mid + 1; + else { // found a match + while (mid+1 < docMaps.length && starts[mid+1] == midValue) { + mid++; // scan to last match + } + if (docMaps[mid] != null) + return newStarts[mid] + docMaps[mid][oldDocID-starts[mid]]; + else + return newStarts[mid] + oldDocID-starts[mid]; + } + } + if (docMaps[hi] != null) + return newStarts[hi] + docMaps[hi][oldDocID-starts[hi]]; + else + return newStarts[hi] + oldDocID-starts[hi]; + } + } +} Property changes on: src/java/org/apache/lucene/index/MergeDocIDRemapper.java ___________________________________________________________________ Name: svn:eol-style + native Index: src/java/org/apache/lucene/index/DocumentsWriter.java =================================================================== --- src/java/org/apache/lucene/index/DocumentsWriter.java (revision 631207) +++ src/java/org/apache/lucene/index/DocumentsWriter.java (working copy) @@ -23,6 +23,10 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Fieldable; import org.apache.lucene.search.Similarity; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Weight; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexInput; @@ -33,9 +37,11 @@ import java.io.PrintStream; import java.io.Reader; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.HashMap; import java.util.ArrayList; +import java.util.Map.Entry; import java.text.NumberFormat; import java.util.Collections; @@ -154,12 +160,110 @@ private PrintStream infoStream; - // This Hashmap buffers delete terms in ram before they - // are applied. The key is delete term; the value is - // number of buffered documents the term applies to. - private HashMap bufferedDeleteTerms = new HashMap(); - private int numBufferedDeleteTerms = 0; + // Holds buffered deletes, by docID, term or query. We + // hold two instances of this class: one for the deletes + // prior to the last flush, the other for deletes after + // the last flush. This is so if we need to abort + // (discard all buffered docs) we can also discard the + // buffered deletes yet keep the deletes done during + // previously flushed segments. + private static class BufferedDeletes { + int numTerms; + HashMap terms = new HashMap(); + HashMap queries = new HashMap(); + List docIDs = new ArrayList(); + private void update(BufferedDeletes in) { + numTerms += in.numTerms; + terms.putAll(in.terms); + queries.putAll(in.queries); + docIDs.addAll(in.docIDs); + in.terms.clear(); + in.numTerms = 0; + in.queries.clear(); + in.docIDs.clear(); + } + + void clear() { + terms.clear(); + queries.clear(); + docIDs.clear(); + numTerms = 0; + } + + boolean any() { + return terms.size() > 0 || docIDs.size() > 0 || queries.size() > 0; + } + + // Remaps all buffered deletes based on a completed + // merge + synchronized void remap(MergeDocIDRemapper mapper, + SegmentInfos infos, + int[][] docMaps, + int[] delCounts, + MergePolicy.OneMerge merge, + int mergeDocCount) { + + final HashMap newDeleteTerms; + + // Remap delete-by-term + if (terms.size() > 0) { + newDeleteTerms = new HashMap(); + Iterator iter = terms.entrySet().iterator(); + while(iter.hasNext()) { + Entry entry = (Entry) iter.next(); + Num num = (Num) entry.getValue(); + newDeleteTerms.put(entry.getKey(), + new Num(mapper.remap(num.getNum()))); + } + } else + newDeleteTerms = null; + + // Remap delete-by-docID + final List newDeleteDocIDs; + + if (docIDs.size() > 0) { + newDeleteDocIDs = new ArrayList(docIDs.size()); + Iterator iter = docIDs.iterator(); + while(iter.hasNext()) { + Integer num = (Integer) iter.next(); + newDeleteDocIDs.add(new Integer(mapper.remap(num.intValue()))); + } + } else + newDeleteDocIDs = null; + + // Remap delete-by-query + final HashMap newDeleteQueries; + + if (queries.size() > 0) { + newDeleteQueries = new HashMap(queries.size()); + Iterator iter = queries.entrySet().iterator(); + while(iter.hasNext()) { + Entry entry = (Entry) iter.next(); + Integer num = (Integer) entry.getValue(); + newDeleteQueries.put(entry.getKey(), + new Integer(mapper.remap(num.intValue()))); + } + } else + newDeleteQueries = null; + + if (newDeleteTerms != null) + terms = newDeleteTerms; + if (newDeleteDocIDs != null) + docIDs = newDeleteDocIDs; + if (newDeleteQueries != null) + queries = newDeleteQueries; + } + } + + // Deletes done after the last flush; these are discarded + // on abort + private BufferedDeletes deletesInRAM = new BufferedDeletes(); + + // Deletes done before the last flush; these are still + // kept on abort + private BufferedDeletes deletesFlushed = new BufferedDeletes(); + // Currently used only for deleting a doc on hitting an non-aborting exception private List bufferedDeleteDocIDs = new ArrayList(); @@ -175,6 +279,18 @@ // non-zero we will flush by RAM usage instead. private int maxBufferedDocs = IndexWriter.DEFAULT_MAX_BUFFERED_DOCS; + private int flushedDocCount; // How many docs already flushed to index + + synchronized void updateFlushedDocCount(int n) { + flushedDocCount += n; + } + synchronized int getFlushedDocCount() { + return flushedDocCount; + } + synchronized void setFlushedDocCount(int n) { + flushedDocCount = n; + } + private boolean closed; // Coarse estimates used to measure RAM usage of buffered deletes @@ -188,7 +304,7 @@ DocumentsWriter(Directory directory, IndexWriter writer) throws IOException { this.directory = directory; this.writer = writer; - + flushedDocCount = writer.docCount(); postingsFreeList = new Posting[0]; } @@ -357,9 +473,7 @@ try { - bufferedDeleteTerms.clear(); - bufferedDeleteDocIDs.clear(); - numBufferedDeleteTerms = 0; + deletesInRAM.clear(); try { abortedFiles = files(); @@ -547,6 +661,8 @@ newFiles.addAll(writeSegment()); + flushedDocCount += docCount; + success = true; } finally { @@ -2109,12 +2225,7 @@ } resetPostingsData(); - - nextDocID = 0; - nextWriteDocID = 0; - numDocsInRAM = 0; - files = null; - + // Maybe downsize postingsFreeList array if (postingsFreeList.length > 1.5*postingsFreeCount) { int newSize = postingsFreeList.length; @@ -2129,6 +2240,10 @@ return flushedFiles; } + synchronized void pushDeletes() { + deletesFlushed.update(deletesInRAM); + } + /** Returns the name of the file with this extension, on * the current segment we are working on. */ private String segmentFileName(String extension) { @@ -2427,16 +2542,8 @@ // Next, wait until my thread state is idle (in case // it's shared with other threads) and for threads to // not be paused nor a flush pending: - while(!closed && (!state.isIdle || pauseThreads != 0 || flushPending || abortCount > 0)) - try { - wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + waitReady(state); - if (closed) - throw new AlreadyClosedException("this IndexWriter is closed"); - if (segment == null) segment = writer.newSegmentName(); @@ -2530,55 +2637,72 @@ return state.doFlushAfter || timeToFlushDeletes(); } + // for testing synchronized int getNumBufferedDeleteTerms() { - return numBufferedDeleteTerms; + return deletesInRAM.numTerms; } + // for testing synchronized HashMap getBufferedDeleteTerms() { - return bufferedDeleteTerms; + return deletesInRAM.terms; } - synchronized List getBufferedDeleteDocIDs() { - return bufferedDeleteDocIDs; + /** Called whenever a merge has completed and the merged segments had deletions */ + synchronized void remapDeletes(SegmentInfos infos, int[][] docMaps, int[] delCounts, MergePolicy.OneMerge merge, int mergeDocCount) { + if (docMaps == null) + // The merged segments had no deletes so docIDs did not change and we have nothing to do + return; + MergeDocIDRemapper mapper = new MergeDocIDRemapper(infos, docMaps, delCounts, merge, mergeDocCount); + deletesInRAM.remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount); + deletesFlushed.remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount); + flushedDocCount -= mapper.docShift; } - // Reset buffered deletes. - synchronized void clearBufferedDeletes() throws IOException { - bufferedDeleteTerms.clear(); - bufferedDeleteDocIDs.clear(); - numBufferedDeleteTerms = 0; - if (numBytesUsed > 0) - resetPostingsData(); - } - - synchronized boolean bufferDeleteTerms(Term[] terms) throws IOException { - while(pauseThreads != 0 || flushPending) + synchronized private void waitReady(ThreadState state) { + while(!closed && ((state != null && !state.isIdle) || pauseThreads != 0 || flushPending || abortCount > 0)) try { wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - for (int i = 0; i < terms.length; i++) - addDeleteTerm(terms[i], numDocsInRAM); + + if (closed) + throw new AlreadyClosedException("this IndexWriter is closed"); + } + + synchronized boolean bufferDeleteTerms(Term[] terms) throws IOException { + waitReady(null); + for (int i = 0; i < terms.length; i++) + addDeleteTerm(terms[i], numDocsInRAM); return timeToFlushDeletes(); } synchronized boolean bufferDeleteTerm(Term term) throws IOException { - while(pauseThreads != 0 || flushPending) - try { - wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + waitReady(null); addDeleteTerm(term, numDocsInRAM); return timeToFlushDeletes(); } + synchronized boolean bufferDeleteQueries(Query[] queries) throws IOException { + waitReady(null); + for (int i = 0; i < queries.length; i++) + addDeleteQuery(queries[i], numDocsInRAM); + return timeToFlushDeletes(); + } + + synchronized boolean bufferDeleteQuery(Query query) throws IOException { + waitReady(null); + addDeleteQuery(query, numDocsInRAM); + return timeToFlushDeletes(); + } + + synchronized boolean deletesFull() { + return maxBufferedDeleteTerms != IndexWriter.DISABLE_AUTO_FLUSH + && ((deletesInRAM.numTerms + deletesInRAM.queries.size() + deletesInRAM.docIDs.size()) >= maxBufferedDeleteTerms); + } + synchronized private boolean timeToFlushDeletes() { - return (bufferIsFull - || (maxBufferedDeleteTerms != IndexWriter.DISABLE_AUTO_FLUSH - && numBufferedDeleteTerms >= maxBufferedDeleteTerms)) - && setFlushPending(); + return (bufferIsFull || deletesFull()) && setFlushPending(); } void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) { @@ -2590,9 +2714,110 @@ } synchronized boolean hasDeletes() { - return bufferedDeleteTerms.size() > 0 || bufferedDeleteDocIDs.size() > 0; + return deletesFlushed.any(); } + synchronized boolean applyDeletes(SegmentInfos infos) throws IOException { + + if (!hasDeletes()) + return false; + + if (infoStream != null) + infoStream.println("apply " + deletesFlushed.numTerms + " buffered deleted terms and " + + deletesFlushed.docIDs.size() + " deleted docIDs and " + + deletesFlushed.queries.size() + " deleted queries on " + + + infos.size() + " segments."); + + final int infosEnd = infos.size(); + + int docStart = 0; + boolean any = false; + for (int i = 0; i < infosEnd; i++) { + IndexReader reader = SegmentReader.get(infos.info(i), false); + boolean success = false; + try { + any |= applyDeletes(reader, docStart); + docStart += reader.maxDoc(); + success = true; + } finally { + if (reader != null) { + try { + if (success) + reader.doCommit(); + } finally { + reader.doClose(); + } + } + } + } + + deletesFlushed.clear(); + + return any; + } + + // Apply buffered delete terms, queries and docIDs to the + // provided reader + private final synchronized boolean applyDeletes(IndexReader reader, int docIDStart) + throws CorruptIndexException, IOException { + + final int docEnd = docIDStart + reader.maxDoc(); + boolean any = false; + + // Delete by term + Iterator iter = deletesFlushed.terms.entrySet().iterator(); + while (iter.hasNext()) { + Entry entry = (Entry) iter.next(); + Term term = (Term) entry.getKey(); + + TermDocs docs = reader.termDocs(term); + if (docs != null) { + int limit = ((DocumentsWriter.Num) entry.getValue()).getNum(); + try { + while (docs.next()) { + int docID = docs.doc(); + if (docIDStart+docID >= limit) + break; + reader.deleteDocument(docID); + any = true; + } + } finally { + docs.close(); + } + } + } + + // Delete by docID + iter = deletesFlushed.docIDs.iterator(); + while(iter.hasNext()) { + int docID = ((Integer) iter.next()).intValue(); + if (docID >= docIDStart && docID < docEnd) { + reader.deleteDocument(docID-docIDStart); + any = true; + } + } + + // Delete by query + IndexSearcher searcher = new IndexSearcher(reader); + iter = deletesFlushed.queries.entrySet().iterator(); + while(iter.hasNext()) { + Entry entry = (Entry) iter.next(); + Query query = (Query) entry.getKey(); + int limit = ((Integer) entry.getValue()).intValue(); + Weight weight = query.weight(searcher); + Scorer scorer = weight.scorer(reader); + while(scorer.next()) { + final int docID = scorer.doc(); + if (docIDStart + docID >= limit) + break; + reader.deleteDocument(docID); + any = true; + } + } + searcher.close(); + return any; + } + // Number of documents a delete term applies to. static class Num { private int num; @@ -2622,29 +2847,26 @@ // delete term will be applied to those documents as well // as the disk segments. synchronized private void addDeleteTerm(Term term, int docCount) { - Num num = (Num) bufferedDeleteTerms.get(term); - if (num == null) { - bufferedDeleteTerms.put(term, new Num(docCount)); - // This is coarse approximation of actual bytes used: - numBytesUsed += (term.field().length() + term.text().length()) * BYTES_PER_CHAR - + 4 + 5 * OBJECT_HEADER_BYTES + 5 * OBJECT_POINTER_BYTES; - if (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH - && numBytesUsed > ramBufferSize) { - bufferIsFull = true; - } - } else { - num.setNum(docCount); - } - numBufferedDeleteTerms++; + Num num = (Num) deletesInRAM.terms.get(term); + final int docIDUpto = flushedDocCount + docCount; + if (num == null) + deletesInRAM.terms.put(term, new Num(docIDUpto)); + else + num.setNum(docIDUpto); + deletesInRAM.numTerms++; } // Buffer a specific docID for deletion. Currently only // used when we hit a exception when adding a document - synchronized private void addDeleteDocID(int docId) { - bufferedDeleteDocIDs.add(new Integer(docId)); - numBytesUsed += OBJECT_HEADER_BYTES + BYTES_PER_INT + OBJECT_POINTER_BYTES; + synchronized private void addDeleteDocID(int docID) { + deletesInRAM.docIDs.add(new Integer(flushedDocCount+docID)); + //numBytesUsed += OBJECT_HEADER_BYTES + BYTES_PER_INT + OBJECT_POINTER_BYTES; } + synchronized private void addDeleteQuery(Query query, int docID) { + deletesInRAM.queries.put(query, new Integer(flushedDocCount + docID)); + } + /** Does the synchronized work to finish/flush the * inverted document. */ private synchronized void finishDocument(ThreadState state) throws IOException, AbortException { Index: src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- src/java/org/apache/lucene/index/IndexWriter.java (revision 631207) +++ src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -20,6 +20,7 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.document.Document; import org.apache.lucene.search.Similarity; +import org.apache.lucene.search.Query; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.Lock; @@ -40,6 +41,7 @@ import java.util.LinkedList; import java.util.Iterator; import java.util.Map.Entry; +import java.util.Arrays; /** An IndexWriter creates and maintains an index. @@ -60,7 +62,9 @@

In either case, documents are added with addDocument and removed with deleteDocuments. + href="#deleteDocuments(org.apache.lucene.index.Term)">deleteDocuments + or deleteDocuments. A document can be updated with updateDocument (which just deletes and then adds the entire document). When finished adding, deleting and updating documents, close should be called.

@@ -75,9 +79,10 @@ #setRAMBufferSizeMB}) or the number of added documents. The default is to flush when RAM usage hits 16 MB. For best indexing speed you should flush by RAM usage with a - large RAM buffer. You can also force a flush by calling - {@link #flush}. When a flush occurs, both pending deletes - and added documents are flushed to the index. A flush may + large RAM buffer. Note that flushing just moves the + internal buffered state in IndexWriter into the index, but + these changes are not visible to IndexReader until either + {@link #commit} or {@link #close} is called. A flush may also trigger one or more segment merges which by default run with a background thread so as not to block the addDocument calls (see below @@ -301,6 +306,7 @@ private SegmentInfos localRollbackSegmentInfos; // segmentInfos we will fallback to if the commit fails private boolean localAutoCommit; // saved autoCommit during local transaction + private int localFlushedDocCount; // saved docWriter.getFlushedDocCount during local transaction private boolean autoCommit = true; // false if we should commit only on close private SegmentInfos segmentInfos = new SegmentInfos(); // the segments @@ -333,6 +339,7 @@ private boolean stopMerges; private int flushCount; + private int flushDeletesCount; private double maxSyncPauseSeconds = DEFAULT_MAX_SYNC_PAUSE_SECONDS; // Last (right most) SegmentInfo created by a merge @@ -1645,7 +1652,7 @@ // Only allow a new merge to be triggered if we are // going to wait for merges: - flush(waitForMerges, true); + flush(waitForMerges, true, true); mergePolicy.close(); @@ -1654,9 +1661,9 @@ mergeScheduler.close(); if (infoStream != null) - message("now call final sync()"); + message("now call final commit()"); - sync(true, 0); + commit(true, 0); if (infoStream != null) message("at close: " + segString()); @@ -1780,7 +1787,11 @@ /** Returns the number of documents currently in this index. */ public synchronized int docCount() { ensureOpen(); - int count = docWriter.getNumDocsInRAM(); + int count; + if (docWriter != null) + count = docWriter.getNumDocsInRAM(); + else + count = 0; for (int i = 0; i < segmentInfos.size(); i++) { SegmentInfo si = segmentInfos.info(i); count += si.docCount; @@ -1788,6 +1799,16 @@ return count; } + public synchronized boolean hasDeletions() throws IOException { + ensureOpen(); + if (docWriter.hasDeletes()) + return true; + for (int i = 0; i < segmentInfos.size(); i++) + if (segmentInfos.info(i).hasDeletions()) + return true; + return false; + } + /** * The maximum number of terms that will be indexed for a single field in a * document. This limits the amount of memory required for indexing, so that @@ -1882,7 +1903,7 @@ } } if (doFlush) - flush(true, false); + flush(true, false, false); } /** @@ -1895,7 +1916,7 @@ ensureOpen(); boolean doFlush = docWriter.bufferDeleteTerm(term); if (doFlush) - flush(true, false); + flush(true, false, false); } /** @@ -1910,10 +1931,38 @@ ensureOpen(); boolean doFlush = docWriter.bufferDeleteTerms(terms); if (doFlush) - flush(true, false); + flush(true, false, false); } /** + * Deletes the document(s) matching the provided query. + * @param query the query to identify the documents to be deleted + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + */ + public void deleteDocuments(Query query) throws CorruptIndexException, IOException { + ensureOpen(); + boolean doFlush = docWriter.bufferDeleteQuery(query); + if (doFlush) + flush(true, false, false); + } + + /** + * Deletes the document(s) matching any of the provided queries. + * All deletes are flushed at the same time. + * @param queries array of queries to identify the documents + * to be deleted + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + */ + public void deleteDocuments(Query[] queries) throws CorruptIndexException, IOException { + ensureOpen(); + boolean doFlush = docWriter.bufferDeleteQueries(queries); + if (doFlush) + flush(true, false, false); + } + + /** * Updates a document by first deleting the document(s) * containing term and then adding the new * document. The delete and then add are atomic as seen @@ -1967,7 +2016,7 @@ } } if (doFlush) - flush(true, false); + flush(true, false, false); } // for test purpose @@ -1994,6 +2043,11 @@ return flushCount; } + // for test purpose + final synchronized int getFlushDeletesCount() { + return flushDeletesCount; + } + final String newSegmentName() { // Cannot synchronize on IndexWriter because that causes // deadlock @@ -2128,7 +2182,7 @@ if (infoStream != null) message("optimize: index now " + segString()); - flush(true, false); + flush(true, false, true); synchronized(this) { resetMergeExceptions(); @@ -2378,13 +2432,14 @@ localRollbackSegmentInfos = (SegmentInfos) segmentInfos.clone(); localAutoCommit = autoCommit; + localFlushedDocCount = docWriter.getFlushedDocCount(); if (localAutoCommit) { if (infoStream != null) message("flush at startTransaction"); - flush(true, false); + flush(true, false, false); // Turn off auto-commit during our local transaction: autoCommit = false; @@ -2405,6 +2460,7 @@ // First restore autoCommit in case we hit an exception below: autoCommit = localAutoCommit; + docWriter.setFlushedDocCount(localFlushedDocCount); // Keep the same segmentInfos instance but replace all // of its SegmentInfo instances. This is so the next @@ -2447,7 +2503,7 @@ if (autoCommit) { boolean success = false; try { - sync(true, 0); + commit(true, 0); success = true; } finally { if (!success) { @@ -2646,21 +2702,27 @@ ensureOpen(); if (infoStream != null) message("flush at addIndexes"); - flush(true, false); + flush(true, false, true); boolean success = false; startTransaction(); try { + int docCount = 0; for (int i = 0; i < dirs.length; i++) { SegmentInfos sis = new SegmentInfos(); // read infos from dir sis.read(dirs[i]); for (int j = 0; j < sis.size(); j++) { - segmentInfos.addElement(sis.info(j)); // add each info + final SegmentInfo info = sis.info(j); + docCount += info.docCount; + segmentInfos.addElement(info); // add each info } } + // Notify DocumentsWriter that the flushed count just increased + docWriter.updateFlushedDocCount(docCount); + optimize(); success = true; @@ -2708,7 +2770,7 @@ ensureOpen(); if (infoStream != null) message("flush at addIndexesNoOptimize"); - flush(true, false); + flush(true, false, true); boolean success = false; @@ -2716,6 +2778,8 @@ try { + int docCount = 0; + for (int i = 0; i < dirs.length; i++) { if (directory == dirs[i]) { // cannot add this index: segments may be deleted in merge before added @@ -2726,10 +2790,14 @@ sis.read(dirs[i]); for (int j = 0; j < sis.size(); j++) { SegmentInfo info = sis.info(j); + docCount += info.docCount; segmentInfos.addElement(info); // add each info } } + // Notify DocumentsWriter that the flushed count just increased + docWriter.updateFlushedDocCount(docCount); + maybeMerge(); // If after merging there remain segments in the index @@ -2802,6 +2870,7 @@ IndexReader sReader = null; try { + if (segmentInfos.size() == 1){ // add existing index, if any sReader = SegmentReader.get(segmentInfos.info(0)); merger.add(sReader); @@ -2827,6 +2896,9 @@ -1, null, false); segmentInfos.addElement(info); + // Notify DocumentsWriter that the flushed count just increased + docWriter.updateFlushedDocCount(docCount); + success = true; } finally { @@ -2885,7 +2957,7 @@ * @deprecated please call {@link #commit}) instead */ public final void flush() throws CorruptIndexException, IOException { - flush(true, false); + flush(true, false, true); } /** @@ -2914,8 +2986,8 @@ } private final void commit(boolean triggerMerges) throws CorruptIndexException, IOException { - flush(triggerMerges, true); - sync(true, 0); + flush(triggerMerges, true, true); + commit(true, 0); } /** @@ -2925,23 +2997,26 @@ * deletes or docs were flushed) if necessary * @param flushDocStores if false we are allowed to keep * doc stores open to share with the next segment + * @param flushDeletes whether pending deletes should also + * be flushed */ - protected final void flush(boolean triggerMerge, boolean flushDocStores) throws CorruptIndexException, IOException { + protected final void flush(boolean triggerMerge, boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException { ensureOpen(); - - if (doFlush(flushDocStores) && triggerMerge) + if (doFlush(flushDocStores, flushDeletes) && triggerMerge) maybeMerge(); } // TODO: this method should not have to be entirely // synchronized, ie, merges should be allowed to commit // even while a flush is happening - private synchronized final boolean doFlush(boolean flushDocStores) throws CorruptIndexException, IOException { + private synchronized final boolean doFlush(boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException { // Make sure no threads are actively adding a document flushCount++; + flushDeletes |= docWriter.deletesFull(); + // Returns true if docWriter is currently aborting, in // which case we skip flushing this segment if (docWriter.pauseAllThreads()) { @@ -2965,15 +3040,6 @@ if (docStoreSegment == null) flushDocStores = false; - // Always flush deletes if there are any delete terms. - // TODO: when autoCommit=false we don't have to flush - // deletes with every flushed segment; we can save - // CPU/IO by buffering longer & flushing deletes only - // when they are full or writer is being closed. We - // have to fix the "applyDeletesSelectively" logic to - // apply to more than just the last flushed segment - boolean flushDeletes = docWriter.hasDeletes(); - int docStoreOffset = docWriter.getDocStoreOffset(); // docStoreOffset should only be non-zero when @@ -3049,54 +3115,12 @@ docStoreIsCompoundFile); } - if (flushDeletes) { - try { - SegmentInfos rollback = (SegmentInfos) segmentInfos.clone(); + docWriter.pushDeletes(); - boolean success = false; - try { - // we should be able to change this so we can - // buffer deletes longer and then flush them to - // multiple flushed segments only when a commit() - // finally happens - applyDeletes(newSegment); - success = true; - } finally { - if (!success) { - if (infoStream != null) - message("hit exception flushing deletes"); - - // Carefully remove any partially written .del - // files - final int size = rollback.size(); - for(int i=0;i= num) { - break; - } - reader.deleteDocument(doc); - } - } finally { - docs.close(); - } - } - } - - if (deleteIds.size() > 0) { - iter = deleteIds.iterator(); - while(iter.hasNext()) - reader.deleteDocument(((Integer) iter.next()).intValue()); - } - } - - // Apply buffered delete terms to this reader. - private final void applyDeletes(HashMap deleteTerms, IndexReader reader) - throws CorruptIndexException, IOException { - Iterator iter = deleteTerms.entrySet().iterator(); - while (iter.hasNext()) { - Entry entry = (Entry) iter.next(); - reader.deleteDocuments((Term) entry.getKey()); - } - } - // utility routines for tests SegmentInfo newestSegment() { return segmentInfos.info(segmentInfos.size()-1); @@ -3986,9 +3949,9 @@ * sync each file, if it wasn't already. If that * succeeds, then we write a new segments_N file & sync * that. */ - private void sync(boolean includeFlushes, long sizeInBytes) throws IOException { + private void commit(boolean includeFlushes, long sizeInBytes) throws IOException { - message("start sync() includeFlushes=" + includeFlushes); + message("start commit() includeFlushes=" + includeFlushes); if (!includeFlushes) syncPause(sizeInBytes); @@ -4004,7 +3967,7 @@ synchronized(this) { if (!commitPending) { - message(" skip sync(): no commit pending"); + message(" skip commit(): no commit pending"); return; }