Index: lucene/CHANGES.txt --- lucene/CHANGES.txt Wed May 18 14:05:02 2011 -0400 +++ lucene/CHANGES.txt Thu May 19 09:22:35 2011 -0400 @@ -520,6 +520,10 @@ * LUCENE-3071: Adding ReversePathHierarchyTokenizer, added skip parameter to PathHierarchyTokenizer (Olivier Favre via ryan) +* LUCENE-3112: Added experimental IndexWriter.add/updateDocuments, + enabling a collection of documents to be indexed, atomically, with + guaranteed sequential docIDs. (Mike McCandless) + API Changes * LUCENE-3061: IndexWriter's getNextMerge() and merge(OneMerge) are now public Index: lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSplitter.java --- lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSplitter.java Wed May 18 14:05:02 2011 -0400 +++ lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSplitter.java Thu May 19 09:22:35 2011 -0400 @@ -45,6 +45,11 @@ * @lucene.experimental You can easily * accidentally remove segments from your index so be * careful! + * + *

NOTE: this tool is unaware of documents added + * atomically via {@link IndexWriter#addDocuments} or {@link + * IndexWriter#updateDocuments}, which means it can easily + * break up such document groups. */ public class IndexSplitter { public SegmentInfos infos; Index: lucene/contrib/misc/src/java/org/apache/lucene/index/MultiPassIndexSplitter.java --- lucene/contrib/misc/src/java/org/apache/lucene/index/MultiPassIndexSplitter.java Wed May 18 14:05:02 2011 -0400 +++ lucene/contrib/misc/src/java/org/apache/lucene/index/MultiPassIndexSplitter.java Thu May 19 09:22:35 2011 -0400 @@ -40,6 +40,11 @@ *

Note 2: the disadvantage of this tool is that source index needs to be * read as many times as there are parts to be created, hence the name of this * tool. + * + *

NOTE: this tool is unaware of documents added + * atomically via {@link IndexWriter#addDocuments} or {@link + * IndexWriter#updateDocuments}, which means it can easily + * break up such document groups. */ public class MultiPassIndexSplitter { Index: lucene/src/java/org/apache/lucene/index/DocumentsWriter.java --- lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Wed May 18 14:05:02 2011 -0400 +++ lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Thu May 19 09:22:35 2011 -0400 @@ -274,11 +274,9 @@ flushControl.setClosed(); } - boolean updateDocument(final Document doc, final Analyzer analyzer, - final Term delTerm) throws CorruptIndexException, IOException { + private boolean preUpdate() throws CorruptIndexException, IOException { ensureOpen(); boolean maybeMerge = false; - final boolean isUpdate = delTerm != null; if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) { // Help out flushing any queued DWPTs so we can un-stall: if (infoStream != null) { @@ -303,9 +301,59 @@ message("continue indexing after helpling out flushing DocumentsWriter is healthy"); } } + return maybeMerge; + } - final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), - this, doc); + private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean maybeMerge) throws IOException { + if (flushingDWPT != null) { + maybeMerge |= doFlush(flushingDWPT); + } else { + final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush(); + if (nextPendingFlush != null) { + maybeMerge |= doFlush(nextPendingFlush); + } + } + + return maybeMerge; + } + + boolean updateDocuments(final Iterable docs, final Analyzer analyzer, + final Term delTerm) throws CorruptIndexException, IOException { + boolean maybeMerge = preUpdate(); + + final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this); + final DocumentsWriterPerThread flushingDWPT; + + try { + if (!perThread.isActive()) { + ensureOpen(); + assert false: "perThread is not active but we are still open"; + } + + final DocumentsWriterPerThread dwpt = perThread.perThread; + try { + final int docCount = dwpt.updateDocuments(docs, analyzer, delTerm); + numDocsInRAM.addAndGet(docCount); + } finally { + if (dwpt.checkAndResetHasAborted()) { + flushControl.doOnAbort(perThread); + } + } + final boolean isUpdate = delTerm != null; + flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); + } finally { + perThread.unlock(); + } + + return postUpdate(flushingDWPT, maybeMerge); + } + + boolean updateDocument(final Document doc, final Analyzer analyzer, + final Term delTerm) throws CorruptIndexException, IOException { + + boolean maybeMerge = preUpdate(); + + final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this); final DocumentsWriterPerThread flushingDWPT; try { @@ -324,20 +372,13 @@ flushControl.doOnAbort(perThread); } } + final boolean isUpdate = delTerm != null; flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); } finally { perThread.unlock(); } - - if (flushingDWPT != null) { - maybeMerge |= doFlush(flushingDWPT); - } else { - final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush(); - if (nextPendingFlush != null) { - maybeMerge |= doFlush(nextPendingFlush); - } - } - return maybeMerge; + + return postUpdate(flushingDWPT, maybeMerge); } private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException { Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java --- lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Wed May 18 14:05:02 2011 -0400 +++ lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Thu May 19 09:22:35 2011 -0400 @@ -68,7 +68,7 @@ this.stallControl = new DocumentsWriterStallControl(); this.perThreadPool = documentsWriter.perThreadPool; this.flushPolicy = documentsWriter.flushPolicy; - this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;; + this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024; this.config = config; this.documentsWriter = documentsWriter; } @@ -162,8 +162,6 @@ stallControl.updateStalled(this); assert assertMemory(); } - - } synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) { @@ -217,7 +215,7 @@ assert assertMemory(); // Take it out of the loop this DWPT is stale perThreadPool.replaceForFlush(state, closed); - }finally { + } finally { stallControl.updateStalled(this); } } Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java --- lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Wed May 18 14:05:02 2011 -0400 +++ lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Thu May 19 09:22:35 2011 -0400 @@ -104,7 +104,7 @@ // largish: doc = null; analyzer = null; - } + } } static class FlushedSegment { @@ -253,6 +253,82 @@ finishDocument(delTerm); } + public int updateDocuments(Iterable docs, Analyzer analyzer, Term delTerm) throws IOException { + assert writer.testPoint("DocumentsWriterPerThread addDocuments start"); + assert deleteQueue != null; + docState.analyzer = analyzer; + if (segment == null) { + // this call is synchronized on IndexWriter.segmentInfos + segment = writer.newSegmentName(); + assert numDocsInRAM == 0; + } + + int docCount = 0; + try { + for(Document doc : docs) { + docState.doc = doc; + docState.docID = numDocsInRAM; + docCount++; + + boolean success = false; + try { + consumer.processDocument(fieldInfos); + success = true; + } finally { + if (!success) { + // An exc is being thrown... + + if (!aborting) { + // One of the documents hit a non-aborting + // exception (eg something happened during + // analysis). We now go and mark any docs + // from this batch that we had already indexed + // as deleted: + int docID = docState.docID; + final int endDocID = docID - docCount; + while (docID > endDocID) { + deleteDocID(docID); + docID--; + } + + // Incr here because finishDocument will not + // be called (because an exc is being thrown): + numDocsInRAM++; + fieldInfos.revertUncommitted(); + } else { + abort(); + } + } + } + success = false; + try { + consumer.finishDocument(); + success = true; + } finally { + if (!success) { + abort(); + } + } + + finishDocument(null); + } + + // Apply delTerm only after all indexing has + // succeeded, but apply it only to docs prior to when + // this batch started: + if (delTerm != null) { + deleteQueue.add(delTerm, deleteSlice); + assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item"; + deleteSlice.apply(pendingDeletes, numDocsInRAM-docCount); + } + + } finally { + docState.clear(); + } + + return docCount; + } + private void finishDocument(Term delTerm) throws IOException { /* * here we actually finish the document in two steps 1. push the delete into Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java --- lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java Wed May 18 14:05:02 2011 -0400 +++ lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java Thu May 19 09:22:35 2011 -0400 @@ -19,7 +19,6 @@ import java.util.Iterator; import java.util.concurrent.locks.ReentrantLock; -import org.apache.lucene.document.Document; import org.apache.lucene.index.FieldInfos.FieldNumberBiMap; import org.apache.lucene.index.SegmentCodecs.SegmentCodecsBuilder; import org.apache.lucene.index.codecs.CodecProvider; @@ -212,7 +211,7 @@ // don't recycle DWPT by default } - public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc); + public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter); /** * Returns an iterator providing access to all {@link ThreadState} Index: lucene/src/java/org/apache/lucene/index/IndexWriter.java --- lucene/src/java/org/apache/lucene/index/IndexWriter.java Wed May 18 14:05:02 2011 -0400 +++ lucene/src/java/org/apache/lucene/index/IndexWriter.java Thu May 19 09:22:35 2011 -0400 @@ -1228,6 +1228,111 @@ } /** + * Adds multiple documents with sequentially + * assigned document IDs, atomically, such that an + * external reader will see all or none of the documents. + * + *

WARNING: the index does not currently record + * which block of documents were added atomically. Today, this is + * fine, because merging will preserve the order of the + * documents (as long as either none or all of them are + * deleted). But it's possible in the future that Lucene + * may more aggressively re-order documents (for example, + * perhaps to obtain better index compression), in which + * case you may need to fully re-index your documents at + * that time. + * + *

See {@link #addDocument(Document)} for details on + * index and IndexWriter state after an Exception, and + * flushing/merging temporary free space requirements.

+ * + *

NOTE: tools that do offline splitting of an index + * (for example, IndexSplitter in contrib) or + * re-sorting of documents (for example, IndexSorter in + * contrib) are not aware of these atomically added documents + * and will likely break them up. Use such tools at your + * own risk! + * + *

NOTE: if this method hits an OutOfMemoryError + * you should immediately close the writer. See above for details.

+ * + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + * + * @lucene.experimental + */ + public void addDocuments(Iterable docs) throws CorruptIndexException, IOException { + addDocuments(docs, analyzer); + } + + /** + * Adds multiple documents, using the provided analyzer, + * with sequentially assigned document IDs, atomically, + * such that an external reader will see all or none of + * the documents. + * + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + * + * @lucene.experimental + */ + public void addDocuments(Iterable docs, Analyzer analyzer) throws CorruptIndexException, IOException { + updateDocuments(null, docs, analyzer); + } + + /** + * Deletes all documents matching the provided + * delTerm and adds multiple documents with sequentially + * assigned document IDs, atomically, such that an + * external reader will see all or none of the documents. + * + * See {@link #addDocuments(Iterable)}. + * + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + * + * @lucene.experimental + */ + public void updateDocuments(Term delTerm, Iterable docs) throws CorruptIndexException, IOException { + updateDocuments(delTerm, docs, analyzer); + } + + /** + * Deletes all documents matching the provided + * delTerm and adds multiple documents, using the provided analyzer, + * with sequentially assigned document IDs, atomically, + * such that an external reader will see all or none of + * the documents. + * See {@link #addDocuments(Iterable)}. + * + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + * + * @lucene.experimental + */ + public void updateDocuments(Term delTerm, Iterable docs, Analyzer analyzer) throws CorruptIndexException, IOException { + ensureOpen(); + try { + boolean success = false; + boolean anySegmentFlushed = false; + try { + anySegmentFlushed = docWriter.updateDocuments(docs, analyzer, delTerm); + success = true; + } finally { + if (!success && infoStream != null) { + message("hit exception updating document"); + } + } + if (anySegmentFlushed) { + maybeMerge(); + } + } catch (OutOfMemoryError oom) { + handleOOM(oom, "updateDocuments"); + } + } + + /** * Deletes the document(s) containing term. * *

NOTE: if this method hits an OutOfMemoryError Index: lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java --- lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java Wed May 18 14:05:02 2011 -0400 +++ lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java Thu May 19 09:22:35 2011 -0400 @@ -139,7 +139,7 @@ } } - assert lastDocID == docState.docID; + assert lastDocID == docState.docID: "lastDocID=" + lastDocID + " docState.docID=" + docState.docID; lastDocID++; Index: lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java --- lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java Wed May 18 14:05:02 2011 -0400 +++ lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java Thu May 19 09:22:35 2011 -0400 @@ -18,7 +18,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.apache.lucene.document.Document; import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; //javadoc /** @@ -48,12 +47,10 @@ } @Override - public ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc) { + public ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) { ThreadState threadState = threadBindings.get(requestingThread); - if (threadState != null) { - if (threadState.tryLock()) { - return threadState; - } + if (threadState != null && threadState.tryLock()) { + return threadState; } ThreadState minThreadState = null; Index: lucene/src/test-framework/org/apache/lucene/index/RandomIndexWriter.java --- lucene/src/test-framework/org/apache/lucene/index/RandomIndexWriter.java Wed May 18 14:05:02 2011 -0400 +++ lucene/src/test-framework/org/apache/lucene/index/RandomIndexWriter.java Thu May 19 09:22:35 2011 -0400 @@ -19,6 +19,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Iterator; import java.util.Random; import org.apache.lucene.analysis.Analyzer; @@ -97,8 +98,43 @@ * Adds a Document. * @see IndexWriter#addDocument(Document) */ - public void addDocument(Document doc) throws IOException { - w.addDocument(doc); + public void addDocument(final Document doc) throws IOException { + if (r.nextInt(5) == 3) { + // TODO: maybe, we should simply buffer up added docs + // (but we need to clone them), and only when + // getReader, commit, etc. are called, we do an + // addDocuments? Would be better testing. + w.addDocuments(new Iterable() { + + // @Override -- not until Java 1.6 + public Iterator iterator() { + return new Iterator() { + boolean done; + + // @Override -- not until Java 1.6 + public boolean hasNext() { + return !done; + } + + // @Override -- not until Java 1.6 + public void remove() { + throw new UnsupportedOperationException(); + } + + // @Override -- not until Java 1.6 + public Document next() { + if (done) { + throw new IllegalStateException(); + } + done = true; + return doc; + } + }; + } + }); + } else { + w.addDocument(doc); + } maybeCommit(); } @@ -116,12 +152,53 @@ } } + public void addDocuments(Iterable docs) throws IOException { + w.addDocuments(docs); + maybeCommit(); + } + + public void updateDocuments(Term delTerm, Iterable docs) throws IOException { + w.updateDocuments(delTerm, docs); + maybeCommit(); + } + /** * Updates a document. * @see IndexWriter#updateDocument(Term, Document) */ - public void updateDocument(Term t, Document doc) throws IOException { - w.updateDocument(t, doc); + public void updateDocument(Term t, final Document doc) throws IOException { + if (r.nextInt(5) == 3) { + w.updateDocuments(t, new Iterable() { + + // @Override -- not until Java 1.6 + public Iterator iterator() { + return new Iterator() { + boolean done; + + // @Override -- not until Java 1.6 + public boolean hasNext() { + return !done; + } + + // @Override -- not until Java 1.6 + public void remove() { + throw new UnsupportedOperationException(); + } + + // @Override -- not until Java 1.6 + public Document next() { + if (done) { + throw new IllegalStateException(); + } + done = true; + return doc; + } + }; + } + }); + } else { + w.updateDocument(t, doc); + } maybeCommit(); } Index: lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java --- lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java Wed May 18 14:05:02 2011 -0400 +++ lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java Thu May 19 09:22:35 2011 -0400 @@ -17,24 +17,16 @@ * limitations under the License. */ -import java.util.ArrayList; -import java.util.List; -import java.util.Random; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintStream; import java.io.Reader; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Random; -import org.apache.lucene.util.Bits; -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.LuceneTestCase; -import org.apache.lucene.util._TestUtil; -import org.apache.lucene.search.DocIdSetIterator; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.IndexOutput; -import org.apache.lucene.store.MockDirectoryWrapper; -import org.apache.lucene.store.RAMDirectory; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.analysis.MockTokenizer; @@ -43,9 +35,54 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.index.IndexWriterConfig.OpenMode; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.PhraseQuery; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.MockDirectoryWrapper; +import org.apache.lucene.store.RAMDirectory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util._TestUtil; public class TestIndexWriterExceptions extends LuceneTestCase { + private static class DocCopyIterator implements Iterable { + private final Document doc; + private final int count; + + public DocCopyIterator(Document doc, int count) { + this.count = count; + this.doc = doc; + } + + // @Override -- not until Java 1.6 + public Iterator iterator() { + return new Iterator() { + int upto; + + // @Override -- not until Java 1.6 + public boolean hasNext() { + return upto < count; + } + + // @Override -- not until Java 1.6 + public Document next() { + upto++; + return doc; + } + + // @Override -- not until Java 1.6 + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + } + private class IndexerThread extends Thread { IndexWriter writer; @@ -87,7 +124,11 @@ idField.setValue(id); Term idTerm = new Term("id", id); try { - writer.updateDocument(idTerm, doc); + if (r.nextBoolean()) { + writer.updateDocuments(idTerm, new DocCopyIterator(doc, _TestUtil.nextInt(r, 1, 20))); + } else { + writer.updateDocument(idTerm, doc); + } } catch (RuntimeException re) { if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": EXC: "); @@ -136,7 +177,7 @@ @Override boolean testPoint(String name) { - if (doFail.get() != null && !name.equals("startDoFlush") && r.nextInt(20) == 17) { + if (doFail.get() != null && !name.equals("startDoFlush") && r.nextInt(40) == 17) { if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": NOW FAIL: " + name); new Throwable().printStackTrace(System.out); @@ -267,6 +308,8 @@ } } + private static String CRASH_FAIL_MESSAGE = "I'm experiencing problems"; + private class CrashingFilter extends TokenFilter { String fieldName; int count; @@ -279,7 +322,7 @@ @Override public boolean incrementToken() throws IOException { if (this.fieldName.equals("crash") && count++ >= 4) - throw new IOException("I'm experiencing problems"); + throw new IOException(CRASH_FAIL_MESSAGE); return input.incrementToken(); } @@ -1278,4 +1321,141 @@ } } } + + public void testAddDocsNonAbortingException() throws Exception { + final Directory dir = newDirectory(); + final RandomIndexWriter w = new RandomIndexWriter(random, dir); + final int numDocs1 = random.nextInt(25); + for(int docCount=0;docCount docs = new ArrayList(); + for(int docCount=0;docCount<7;docCount++) { + Document doc = new Document(); + docs.add(doc); + doc.add(newField("id", docCount+"", Field.Index.NOT_ANALYZED)); + doc.add(newField("content", "silly content " + docCount, Field.Index.ANALYZED)); + if (docCount == 4) { + Field f = newField("crash", "", Field.Index.ANALYZED); + doc.add(f); + MockTokenizer tokenizer = new MockTokenizer(new StringReader("crash me on the 4th token"), MockTokenizer.WHITESPACE, false); + tokenizer.setEnableChecks(false); // disable workflow checking as we forcefully close() in exceptional cases. + f.setTokenStream(new CrashingFilter("crash", tokenizer)); + } + } + try { + w.addDocuments(docs); + // BUG: CrashingFilter didn't + fail("did not hit expected exception"); + } catch (IOException ioe) { + // expected + assertEquals(CRASH_FAIL_MESSAGE, ioe.getMessage()); + } + + final int numDocs2 = random.nextInt(25); + for(int docCount=0;docCount docs = new ArrayList(); + final int numDocs2 = random.nextInt(25); + for(int docCount=0;docCount subIDs; + public boolean deleted; + + public SubDocs(String packID, List subIDs) { + this.packID = packID; + this.subIDs = subIDs; + } + } + + // TODO: is there a pre-existing way to do this!!! + private Document cloneDoc(Document doc1) { + final Document doc2 = new Document(); + for(Fieldable f : doc1.getFields()) { + Field field1 = (Field) f; + + Field field2 = new Field(field1.name(), + field1.stringValue(), + field1.isStored() ? Field.Store.YES : Field.Store.NO, + field1.isIndexed() ? (field1.isTokenized() ? Field.Index.ANALYZED : Field.Index.NOT_ANALYZED) : Field.Index.NO); + if (field1.getOmitNorms()) { + field2.setOmitNorms(true); + } + if (field1.getOmitTermFreqAndPositions()) { + field2.setOmitTermFreqAndPositions(true); + } + doc2.add(field2); + } + + return doc2; + } + @Test public void testNRTThreads() throws Exception { @@ -106,13 +141,16 @@ final int NUM_INDEX_THREADS = 2; final int NUM_SEARCH_THREADS = 3; + final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : 5; final AtomicBoolean failed = new AtomicBoolean(); final AtomicInteger addCount = new AtomicInteger(); final AtomicInteger delCount = new AtomicInteger(); + final AtomicInteger packCount = new AtomicInteger(); final Set delIDs = Collections.synchronizedSet(new HashSet()); + final List allSubDocs = Collections.synchronizedList(new ArrayList()); final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000; Thread[] threads = new Thread[NUM_INDEX_THREADS]; @@ -120,7 +158,9 @@ threads[thread] = new Thread() { @Override public void run() { + // TODO: would be better if this were cross thread, so that we make sure one thread deleting anothers added docs works: final List toDeleteIDs = new ArrayList(); + final List toDeleteSubDocs = new ArrayList(); while(System.currentTimeMillis() < stopTime && !failed.get()) { try { Document doc = docs.nextDoc(); @@ -138,7 +178,92 @@ if (VERBOSE) { //System.out.println(Thread.currentThread().getName() + ": add doc id:" + doc.get("docid")); } - writer.addDocument(doc); + + if (random.nextBoolean()) { + // Add a pack of adjacent sub-docs + final String packID; + final SubDocs delSubDocs; + if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) { + delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size())); + assert !delSubDocs.deleted; + toDeleteSubDocs.remove(delSubDocs); + // reuse prior packID + packID = delSubDocs.packID; + } else { + delSubDocs = null; + // make new packID + packID = packCount.getAndIncrement() + ""; + } + + final Field packIDField = newField("packID", packID, Field.Store.YES, Field.Index.NOT_ANALYZED); + final List docIDs = new ArrayList(); + final SubDocs subDocs = new SubDocs(packID, docIDs); + final List docsList = new ArrayList(); + + allSubDocs.add(subDocs); + doc.add(packIDField); + docsList.add(cloneDoc(doc)); + docIDs.add(doc.get("docid")); + + final int maxDocCount = _TestUtil.nextInt(random, 1, 10); + while(docsList.size() < maxDocCount) { + doc = docs.nextDoc(); + if (doc == null) { + break; + } + docsList.add(cloneDoc(doc)); + docIDs.add(doc.get("docid")); + } + addCount.addAndGet(docsList.size()); + + if (delSubDocs != null) { + delSubDocs.deleted = true; + delIDs.addAll(delSubDocs.subIDs); + delCount.addAndGet(delSubDocs.subIDs.size()); + if (VERBOSE) { + System.out.println("TEST: update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs); + } + writer.updateDocuments(new Term("packID", delSubDocs.packID), docsList); + /* + // non-atomic: + writer.deleteDocuments(new Term("packID", delSubDocs.packID)); + for(Document subDoc : docsList) { + writer.addDocument(subDoc); + } + */ + } else { + if (VERBOSE) { + System.out.println("TEST: add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs); + } + writer.addDocuments(docsList); + + /* + // non-atomic: + for(Document subDoc : docsList) { + writer.addDocument(subDoc); + } + */ + } + doc.removeField("packID"); + + if (random.nextInt(5) == 2) { + if (VERBOSE) { + //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID); + } + toDeleteSubDocs.add(subDocs); + } + + } else { + writer.addDocument(doc); + addCount.getAndIncrement(); + + if (random.nextInt(5) == 3) { + if (VERBOSE) { + //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid")); + } + toDeleteIDs.add(doc.get("docid")); + } + } } else { // we use update but it never replaces a // prior doc @@ -146,14 +271,17 @@ //System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid")); } writer.updateDocument(new Term("docid", doc.get("docid")), doc); + addCount.getAndIncrement(); + + if (random.nextInt(5) == 3) { + if (VERBOSE) { + //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid")); + } + toDeleteIDs.add(doc.get("docid")); + } } - if (random.nextInt(5) == 3) { - if (VERBOSE) { - //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid")); - } - toDeleteIDs.add(doc.get("docid")); - } - if (random.nextInt(50) == 17) { + + if (random.nextInt(30) == 17) { if (VERBOSE) { //System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes"); } @@ -169,8 +297,19 @@ } delIDs.addAll(toDeleteIDs); toDeleteIDs.clear(); + + for(SubDocs subDocs : toDeleteSubDocs) { + assert !subDocs.deleted; + writer.deleteDocuments(new Term("packID", subDocs.packID)); + subDocs.deleted = true; + if (VERBOSE) { + System.out.println(" del subs: " + subDocs.subIDs + " packID=" + subDocs.packID); + } + delIDs.addAll(subDocs.subIDs); + delCount.addAndGet(subDocs.subIDs.size()); + } + toDeleteSubDocs.clear(); } - addCount.getAndIncrement(); if (addedField != null) { doc.removeField(addedField); } @@ -341,7 +480,7 @@ if (VERBOSE) { System.out.println("TEST: done join [" + (System.currentTimeMillis()-t0) + " ms]; addCount=" + addCount + " delCount=" + delCount); } - + final IndexReader r2 = writer.getReader(); final IndexSearcher s = newSearcher(r2); boolean doFail = false; @@ -352,6 +491,43 @@ doFail = true; } } + + // Make sure each group of sub-docs are still in docID order: + for(SubDocs subDocs : allSubDocs) { + if (!subDocs.deleted) { + // We sort by relevance but the scores should be identical so sort falls back to by docID: + TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20); + assertEquals(subDocs.subIDs.size(), hits.totalHits); + int lastDocID = -1; + int startDocID = -1; + for(ScoreDoc scoreDoc : hits.scoreDocs) { + final int docID = scoreDoc.doc; + if (lastDocID != -1) { + assertEquals(1+lastDocID, docID); + } else { + startDocID = docID; + } + lastDocID = docID; + final Document doc = s.doc(docID); + assertEquals(subDocs.packID, doc.get("packID")); + } + + lastDocID = startDocID - 1; + for(String subID : subDocs.subIDs) { + hits = s.search(new TermQuery(new Term("docid", subID)), 1); + assertEquals(1, hits.totalHits); + final int docID = hits.scoreDocs[0].doc; + if (lastDocID != -1) { + assertEquals(1+lastDocID, docID); + } + lastDocID = docID; + } + } else { + for(String subID : subDocs.subIDs) { + assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits); + } + } + } final int endID = Integer.parseInt(docs.nextDoc().get("docid")); for(int id=0;id