Index: lucene/src/java/org/apache/lucene/index/BufferedDeletes.java --- lucene/src/java/org/apache/lucene/index/BufferedDeletes.java Mon Jan 24 13:33:12 2011 -0500 +++ lucene/src/java/org/apache/lucene/index/BufferedDeletes.java Mon Jan 24 15:33:57 2011 -0500 @@ -182,7 +182,8 @@ } if (infoStream != null) { - message("applyDeletes: seg=" + info + " segment's deletes=[" + (deletes == null ? "null" : deletes) + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "]"); + // nocommit + //message("applyDeletes: seg=" + info + " segment's deletes=[" + (deletes == null ? "null" : deletes) + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "]"); } hasDeletes |= deletes != null; @@ -195,7 +196,8 @@ any = true; } if (infoStream != null) { - message("deletes touched " + delCountInc + " docIDs"); + // nocommit + //message("deletes touched " + delCountInc + " docIDs"); } if (deletes != null) { Index: lucene/src/java/org/apache/lucene/index/CheckIndex.java --- lucene/src/java/org/apache/lucene/index/CheckIndex.java Mon Jan 24 13:33:12 2011 -0500 +++ lucene/src/java/org/apache/lucene/index/CheckIndex.java Mon Jan 24 15:33:57 2011 -0500 @@ -454,11 +454,11 @@ final int numDocs = reader.numDocs(); toLoseDocCount = numDocs; if (reader.hasDeletions()) { - if (reader.deletedDocs.count() != info.getDelCount()) { - throw new RuntimeException("delete count mismatch: info=" + info.getDelCount() + " vs deletedDocs.count()=" + reader.deletedDocs.count()); + if (reader.deletedDocs.deletes.count() != info.getDelCount()) { + throw new RuntimeException("delete count mismatch: info=" + info.getDelCount() + " vs deletedDocs.count()=" + reader.deletedDocs.deletes.count()); } - if (reader.deletedDocs.count() > reader.maxDoc()) { - throw new RuntimeException("too many deleted docs: maxDoc()=" + reader.maxDoc() + " vs deletedDocs.count()=" + reader.deletedDocs.count()); + if (reader.deletedDocs.deletes.count() > reader.maxDoc()) { + throw new RuntimeException("too many deleted docs: maxDoc()=" + reader.maxDoc() + " vs deletedDocs.count()=" + reader.deletedDocs.deletes.count()); } if (info.docCount - numDocs != info.getDelCount()){ throw new RuntimeException("delete count mismatch: info=" + info.getDelCount() + " vs reader=" + (info.docCount - numDocs)); Index: lucene/src/java/org/apache/lucene/index/IndexWriter.java --- lucene/src/java/org/apache/lucene/index/IndexWriter.java Mon Jan 24 13:33:12 2011 -0500 +++ lucene/src/java/org/apache/lucene/index/IndexWriter.java Mon Jan 24 15:33:57 2011 -0500 @@ -3071,14 +3071,17 @@ // Hold onto the "live" reader; we will use this to // commit merged deletes + //System.out.println("IW: now pull merge reader seg=" + info.name); SegmentReader reader = merge.readers[i] = readerPool.get(info, true, MERGE_READ_BUFFER_SIZE, -config.getReaderTermsIndexDivisor()); + //System.out.println("IW: done pull merge reader seg=" + info.name); // We clone the segment readers because other // deletes may come in while we're merging so we // need readers that will not change SegmentReader clone = merge.readersClone[i] = (SegmentReader) reader.clone(true); + //System.out.println("IW: done clone merge reader clone seg=" + info.name); merger.add(clone); totDocCount += clone.numDocs(); Index: lucene/src/java/org/apache/lucene/index/SegmentDeleteVectors.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ lucene/src/java/org/apache/lucene/index/SegmentDeleteVectors.java Mon Jan 24 15:33:57 2011 -0500 @@ -0,0 +1,203 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.LinkedList; + +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.BitVector; + +/* This class managed recycled delete BitVectors against a + * given segment. This is in order to make reopen time for + * a segment very fast: we pull a previoius BitVector, + * replay the deletes log, and use it. + * + * This is different from SegmentDeletes because that class + * tracks pending deletes (ie, terms, queries to be + * deleted) not yet applied, while this class handles + * already applied deletes. + * + * Each deleted doc is enrolled into something like a + * transaction log; this enables us to replay deletes to + * catch up a previous BitVector to the current deletes. */ + +// TODO +// - existing thread hazard: if 1 thread clones an SR +// while another makes a delete against it, we may fail +// the COW +// - hmm: we could make a minimalist FrozenBitVector that +// has "frozen" bits, eg final byte[] +// - MUST prune the deletes log! +// - can i pull the COW logic out of SegmentReader...? +// - someday this can also be used for efficient writing +// of del docs to disk +// - make sure undeleteAll works +// - need to have a "maxFreeBV"? +// - need some "safety" for apps that don't open very +// often but do lots of deletes... +// - don't track in the non-NRT case +// - assert if a writeable is obtained it's the "latest" +// one... + +class SegmentDeleteVectors { + + public final BitVector deletes; + + // nocommit -- must enable turning on/off pool tracking: + private final Pool pool; + private int refCount = 1; + public int gen; + private final String segment; + private boolean writeable; + + // Creates the first deletes in a chain of SRs. Ie, this + // is called when SR is first opened anew. All others in + // the linked chain of cloned/reopened SRs are either + // shared (incRef/decRef) or created by clone(): + public SegmentDeleteVectors(String segment, BitVector deletes) { + //System.out.println("SD: seg=" + segment + " make new size=" + deletes.size() + " this=" + this); + this.deletes = deletes; + this.gen = 0; + pool = new Pool(); + this.segment = segment; + } + + private SegmentDeleteVectors(String segment, BitVector deletes, Pool pool) { + this.gen = pool.gen(); + this.deletes = deletes; + this.pool = pool; + this.segment = segment; + } + + // A single instance of this class is shared across + // multiple SegmentDeleteVectors: + private static class Pool { + // nocommit -- must periodically prune this! + private int[] delDocsLog = new int[1]; + private int nextGen; + private final LinkedList freeList = new LinkedList(); + + public synchronized void delete(int docID) { + if (delDocsLog.length == nextGen) { + delDocsLog = ArrayUtil.grow(delDocsLog); + } + delDocsLog[nextGen++] = docID; + } + + public synchronized int gen() { + return nextGen; + } + + // Adds to free list + public synchronized void add(SegmentDeleteVectors delDocs) { + assert delDocs.gen <= nextGen; + //System.out.println(" add to freeList gen=" + delDocs.gen + " bv=" + delDocs.deletes); + freeList.add(delDocs); + } + + // Removes from free list & replays deletes: + public synchronized SegmentDeleteVectors get(String segment) { + // nocommit -- sweep and return "oldest"? + if (freeList.size() > 0) { + SegmentDeleteVectors old = freeList.removeFirst(); + + // Catch this BitVector up to the current deletes by + // replaying the log: + //System.out.println("REUSE: seg=" + segment + " " + (nextGen-old.gen) + " dels to replay " + old + " bv=" + old.deletes); + for(;old.gen fieldsReaderLocal = new FieldsReaderLocal(); CloseableThreadLocal termVectorsLocal = new CloseableThreadLocal(); - volatile BitVector deletedDocs; - AtomicInteger deletedDocsRef = null; + // nocommit -- why not private? + volatile SegmentDeleteVectors deletedDocs; + private boolean deletedDocsDirty = false; private boolean normsDirty = false; - private int pendingDeleteCount; + private volatile int pendingDeleteCount; private boolean rollbackHasChanges = false; private boolean rollbackDeletedDocsDirty = false; @@ -520,13 +521,13 @@ @Override public Bits getDeletedDocs() { - return deletedDocs; + return deletedDocs == null ? null : deletedDocs.deletes; } private boolean checkDeletedCounts() throws IOException { - final int recomputedCount = deletedDocs.getRecomputedCount(); + final int recomputedCount = deletedDocs.deletes.getRecomputedCount(); - assert deletedDocs.count() == recomputedCount : "deleted count=" + deletedDocs.count() + " vs recomputed count=" + recomputedCount; + assert deletedDocs.deletes.count() == recomputedCount : "deleted count=" + deletedDocs.deletes.count() + " vs recomputed count=" + recomputedCount; assert si.getDelCount() == recomputedCount : "delete count mismatch: info=" + si.getDelCount() + " vs BitVector=" + recomputedCount; @@ -542,14 +543,14 @@ private void loadDeletedDocs() throws IOException { // NOTE: the bitvector is stored using the regular directory, not cfs if (hasDeletions(si)) { - deletedDocs = new BitVector(directory(), si.getDelFileName()); - deletedDocsRef = new AtomicInteger(1); + deletedDocs = new SegmentDeleteVectors(si.name, new BitVector(directory(), si.getDelFileName())); assert checkDeletedCounts(); - if (deletedDocs.size() != si.docCount) { - throw new CorruptIndexException("document count mismatch: deleted docs count " + deletedDocs.size() + " vs segment doc count " + si.docCount + " segment=" + si.name); + if (deletedDocs.deletes.size() != si.docCount) { + throw new CorruptIndexException("document count mismatch: deleted docs count " + deletedDocs.deletes.size() + " vs segment doc count " + si.docCount + " segment=" + si.name); } - } else + } else { assert si.getDelCount() == 0; + } } /** @@ -568,9 +569,12 @@ * @param bv BitVector to clone * @return New BitVector */ + // nocommit -- what to do... + /* protected BitVector cloneDeletedDocs(BitVector bv) { return (BitVector)bv.clone(); } + */ @Override public final synchronized Object clone() { @@ -644,19 +648,18 @@ if (doClone) { if (deletedDocs != null) { - deletedDocsRef.incrementAndGet(); + deletedDocs.incRef(); clone.deletedDocs = deletedDocs; - clone.deletedDocsRef = deletedDocsRef; } } else { if (!deletionsUpToDate) { - // load deleted docs + // load new deleted docs assert clone.deletedDocs == null; clone.loadDeletedDocs(); } else if (deletedDocs != null) { - deletedDocsRef.incrementAndGet(); + // share same deleted docs + deletedDocs.incRef(); clone.deletedDocs = deletedDocs; - clone.deletedDocsRef = deletedDocsRef; } } @@ -710,7 +713,7 @@ if (deletedDocsDirty) { // re-write deleted si.advanceDelGen(); - assert deletedDocs.length() == si.docCount; + assert deletedDocs.deletes.length() == si.docCount; // We can write directly to the actual name (vs to a // .tmp & renaming it) because the file is not live @@ -718,7 +721,7 @@ final String delFileName = si.getDelFileName(); boolean success = false; try { - deletedDocs.write(directory(), delFileName); + deletedDocs.deletes.write(directory(), delFileName); success = true; } finally { if (!success) { @@ -733,7 +736,7 @@ si.setDelCount(si.getDelCount()+pendingDeleteCount); pendingDeleteCount = 0; - assert deletedDocs.count() == si.getDelCount(): "delete count mismatch during commit: info=" + si.getDelCount() + " vs BitVector=" + deletedDocs.count(); + assert deletedDocs.deletes.count() == si.getDelCount(): "delete count mismatch during commit: info=" + si.getDelCount() + " vs BitVector=" + deletedDocs.deletes.count(); } else { assert pendingDeleteCount == 0; } @@ -761,7 +764,7 @@ fieldsReaderLocal.close(); if (deletedDocs != null) { - deletedDocsRef.decrementAndGet(); + deletedDocs.decRef(); // null so if an app hangs on to us we still free most ram deletedDocs = null; } @@ -796,36 +799,28 @@ @Override protected void doDelete(int docNum) { if (deletedDocs == null) { - deletedDocs = new BitVector(maxDoc()); - deletedDocsRef = new AtomicInteger(1); + deletedDocs = new SegmentDeleteVectors(si.name, new BitVector(maxDoc())); } - // there is more than 1 SegmentReader with a reference to this - // deletedDocs BitVector so decRef the current deletedDocsRef, - // clone the BitVector, create a new deletedDocsRef - if (deletedDocsRef.get() > 1) { - AtomicInteger oldRef = deletedDocsRef; - deletedDocs = cloneDeletedDocs(deletedDocs); - deletedDocsRef = new AtomicInteger(1); - oldRef.decrementAndGet(); + + // nocommit -- isn't there thread hazard here? one + // thread cloning this SR and another doing deletes.. + deletedDocs = deletedDocs.getWriteable(); + deletedDocsDirty = true; + if (deletedDocs.delete(docNum)) { + pendingDeleteCount++; } - deletedDocsDirty = true; - if (!deletedDocs.getAndSet(docNum)) - pendingDeleteCount++; } @Override protected void doUndeleteAll() { deletedDocsDirty = false; if (deletedDocs != null) { - assert deletedDocsRef != null; - deletedDocsRef.decrementAndGet(); + deletedDocs.decRef(); deletedDocs = null; - deletedDocsRef = null; pendingDeleteCount = 0; si.clearDelGen(); si.setDelCount(0); } else { - assert deletedDocsRef == null; assert pendingDeleteCount == 0; } } @@ -865,8 +860,9 @@ public int numDocs() { // Don't call ensureOpen() here (it could affect performance) int n = maxDoc(); - if (deletedDocs != null) - n -= deletedDocs.count(); + if (deletedDocs != null) { + n -= deletedDocs.deletes.count(); + } return n; } Index: lucene/src/test/org/apache/lucene/index/TestIndexReaderClone.java --- lucene/src/test/org/apache/lucene/index/TestIndexReaderClone.java Mon Jan 24 13:33:12 2011 -0500 +++ lucene/src/test/org/apache/lucene/index/TestIndexReaderClone.java Mon Jan 24 15:33:57 2011 -0500 @@ -316,7 +316,7 @@ origSegmentReader.deleteDocument(10); assertDelDocsRefCountEquals(1, origSegmentReader); origSegmentReader.undeleteAll(); - assertNull(origSegmentReader.deletedDocsRef); + assertNull(origSegmentReader.deletedDocs); origSegmentReader.close(); // need to test norms? dir1.close(); @@ -348,7 +348,7 @@ IndexReader origReader = IndexReader.open(dir1, false); SegmentReader origSegmentReader = getOnlySegmentReader(origReader); // deletedDocsRef should be null because nothing has updated yet - assertNull(origSegmentReader.deletedDocsRef); + assertNull(origSegmentReader.deletedDocs); // we deleted a document, so there is now a deletedDocs bitvector and a // reference to it @@ -448,7 +448,7 @@ } private void assertDelDocsRefCountEquals(int refCount, SegmentReader reader) { - assertEquals(refCount, reader.deletedDocsRef.get()); + assertEquals(refCount, reader.deletedDocs.getRefCount()); } public void testCloneSubreaders() throws Exception { Index: lucene/src/test/org/apache/lucene/index/TestIndexReaderReopen.java --- lucene/src/test/org/apache/lucene/index/TestIndexReaderReopen.java Mon Jan 24 13:33:12 2011 -0500 +++ lucene/src/test/org/apache/lucene/index/TestIndexReaderReopen.java Mon Jan 24 15:33:57 2011 -0500 @@ -1162,11 +1162,11 @@ // At this point they share the same BitVector assertTrue(sr1.deletedDocs==sr2.deletedDocs); - final BitVector delDocs = sr1.deletedDocs; + final BitVector delDocs = sr1.deletedDocs.deletes; r1.close(); r2.deleteDocument(0); - assertTrue(delDocs==sr2.deletedDocs); + assertTrue(delDocs==sr2.deletedDocs.deletes); r2.close(); dir.close(); } Index: lucene/src/test/org/apache/lucene/index/TestIndexWriterReader.java --- lucene/src/test/org/apache/lucene/index/TestIndexWriterReader.java Mon Jan 24 13:33:12 2011 -0500 +++ lucene/src/test/org/apache/lucene/index/TestIndexWriterReader.java Mon Jan 24 15:33:57 2011 -0500 @@ -45,7 +45,6 @@ import java.util.concurrent.atomic.AtomicInteger; public class TestIndexWriterReader extends LuceneTestCase { - static PrintStream infoStream; public static int count(Term t, IndexReader r) throws IOException { int count = 0; @@ -234,7 +233,7 @@ } IndexWriter writer = new IndexWriter(dir1, iwc); - writer.setInfoStream(infoStream); + writer.setInfoStream(VERBOSE ? System.out : null); // create the index createIndexNoClose(!optimize, "index1", writer); writer.flush(false, true); @@ -242,7 +241,7 @@ // create a 2nd index Directory dir2 = newDirectory(); IndexWriter writer2 = new IndexWriter(dir2, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer())); - writer2.setInfoStream(infoStream); + writer2.setInfoStream(VERBOSE ? System.out : null); createIndexNoClose(!optimize, "index2", writer2); writer2.close(); @@ -280,12 +279,12 @@ Directory dir1 = newDirectory(); IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer())); - writer.setInfoStream(infoStream); + writer.setInfoStream(VERBOSE ? System.out : null); // create a 2nd index Directory dir2 = newDirectory(); IndexWriter writer2 = new IndexWriter(dir2, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer())); - writer2.setInfoStream(infoStream); + writer2.setInfoStream(VERBOSE ? System.out : null); createIndexNoClose(!optimize, "index2", writer2); writer2.close(); @@ -314,7 +313,7 @@ Directory dir1 = newDirectory(); IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer()).setReaderTermsIndexDivisor(2)); - writer.setInfoStream(infoStream); + writer.setInfoStream(VERBOSE ? System.out : null); // create the index createIndexNoClose(!optimize, "index1", writer); writer.flush(false, true); @@ -352,7 +351,7 @@ // reopen the writer to verify the delete made it to the directory writer = new IndexWriter(dir1, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer())); - writer.setInfoStream(infoStream); + writer.setInfoStream(VERBOSE ? System.out : null); IndexReader w2r1 = writer.getReader(); assertEquals(0, count(new Term("id", id10), w2r1)); w2r1.close(); @@ -368,7 +367,7 @@ IndexWriter mainWriter = new IndexWriter(mainDir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer())); _TestUtil.reduceOpenFiles(mainWriter); - mainWriter.setInfoStream(infoStream); + mainWriter.setInfoStream(VERBOSE ? System.out : null); AddDirectoriesThreads addDirThreads = new AddDirectoriesThreads(numIter, mainWriter); addDirThreads.launchThreads(numDirs); addDirThreads.joinThreads(); @@ -520,7 +519,7 @@ public void doTestIndexWriterReopenSegment(boolean optimize) throws Exception { Directory dir1 = newDirectory(); IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer())); - writer.setInfoStream(infoStream); + writer.setInfoStream(VERBOSE ? System.out : null); IndexReader r1 = writer.getReader(); assertEquals(0, r1.maxDoc()); createIndexNoClose(false, "index1", writer); @@ -640,7 +639,7 @@ setMergeScheduler(new ConcurrentMergeScheduler()). setMergePolicy(newLogMergePolicy()) ); - writer.setInfoStream(infoStream); + writer.setInfoStream(VERBOSE ? System.out : null); // create the index createIndexNoClose(false, "test", writer); @@ -672,7 +671,7 @@ Directory dir1 = newDirectory(); IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer()).setMergeScheduler(new ConcurrentMergeScheduler())); writer.commit(); - writer.setInfoStream(infoStream); + writer.setInfoStream(VERBOSE ? System.out : null); // create the index createIndexNoClose(false, "test", writer); @@ -704,7 +703,7 @@ public void testAfterClose() throws Exception { Directory dir1 = newDirectory(); IndexWriter writer = new IndexWriter(dir1, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer())); - writer.setInfoStream(infoStream); + writer.setInfoStream(VERBOSE ? System.out : null); // create the index createIndexNoClose(false, "test", writer); @@ -737,7 +736,7 @@ newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer()). setMergePolicy(newLogMergePolicy(2)) ); - writer.setInfoStream(infoStream); + writer.setInfoStream(VERBOSE ? System.out : null); // create the index createIndexNoClose(false, "test", writer); @@ -815,17 +814,24 @@ // Stress test reopen during add/delete public void testDuringAddDelete() throws Exception { Directory dir1 = newDirectory(); + if (VERBOSE) { + System.out.println("TEST: create init index"); + } final IndexWriter writer = new IndexWriter( dir1, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer()). setMergePolicy(newLogMergePolicy(2)) ); - writer.setInfoStream(infoStream); + writer.setInfoStream(VERBOSE ? System.out : null); // create the index createIndexNoClose(false, "test", writer); writer.commit(); + if (VERBOSE) { + System.out.println("TEST: open first reader"); + } + IndexReader r = writer.getReader(); final int NUM_THREAD = 5; @@ -844,6 +850,9 @@ int count = 0; do { try { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": TEST: cycle indexing"); + } for(int docUpto=0;docUpto<10;docUpto++) { writer.addDocument(createDocument(10*count+docUpto, "test", 4)); } @@ -866,6 +875,9 @@ int sum = 0; while(System.currentTimeMillis() < endTime) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": TEST: cycle reopen"); + } IndexReader r2 = r.reopen(); if (r2 != r) { r.close();