Index: lucene/CHANGES.txt --- lucene/CHANGES.txt Fri May 20 05:12:17 2011 -0400 +++ lucene/CHANGES.txt Sun May 22 08:30:46 2011 -0400 @@ -486,6 +486,10 @@ document IDs and scores encountered during the search, and "replay" them to another Collector. (Mike McCandless, Shai Erera) +* LUCENE-3112: Added experimental IndexWriter.add/updateDocuments, + enabling a collection of documents to be indexed, atomically, with + guaranteed sequential docIDs. (Mike McCandless) + API Changes * LUCENE-3061: IndexWriter's getNextMerge() and merge(OneMerge) are now public Index: lucene/contrib/CHANGES.txt --- lucene/contrib/CHANGES.txt Fri May 20 05:12:17 2011 -0400 +++ lucene/contrib/CHANGES.txt Sun May 22 08:30:46 2011 -0400 @@ -100,6 +100,11 @@ case where the indexing rate is lowish but the reopen rate is highish, to take load off the IO system. (Mike McCandless) + * LUCENE-3129: Added DocBlockGroupingCollector, a single pass + grouping collector which is faster than the two-pass approach, but + requires that every document sharing the same group was indexed as + a doc block (IndexWriter.add/updateDocuments). (Mike McCandless) + Optimizations * LUCENE-3040: Switch all analysis consumers (highlighter, morelikethis, memory, ...) Index: lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSplitter.java --- lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSplitter.java Fri May 20 05:12:17 2011 -0400 +++ lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSplitter.java Sun May 22 08:30:46 2011 -0400 @@ -45,6 +45,11 @@ * @lucene.experimental You can easily * accidentally remove segments from your index so be * careful! + * + *

NOTE: this tool is unaware of documents added + * atomically via {@link IndexWriter#addDocuments} or {@link + * IndexWriter#updateDocuments}, which means it can easily + * break up such document groups. */ public class IndexSplitter { public SegmentInfos infos; Index: lucene/contrib/misc/src/java/org/apache/lucene/index/MultiPassIndexSplitter.java --- lucene/contrib/misc/src/java/org/apache/lucene/index/MultiPassIndexSplitter.java Fri May 20 05:12:17 2011 -0400 +++ lucene/contrib/misc/src/java/org/apache/lucene/index/MultiPassIndexSplitter.java Sun May 22 08:30:46 2011 -0400 @@ -40,6 +40,11 @@ *

Note 2: the disadvantage of this tool is that source index needs to be * read as many times as there are parts to be created, hence the name of this * tool. + * + *

NOTE: this tool is unaware of documents added + * atomically via {@link IndexWriter#addDocuments} or {@link + * IndexWriter#updateDocuments}, which means it can easily + * break up such document groups. */ public class MultiPassIndexSplitter { Index: lucene/src/java/org/apache/lucene/index/DocumentsWriter.java --- lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Fri May 20 05:12:17 2011 -0400 +++ lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Sun May 22 08:30:46 2011 -0400 @@ -274,11 +274,9 @@ flushControl.setClosed(); } - boolean updateDocument(final Document doc, final Analyzer analyzer, - final Term delTerm) throws CorruptIndexException, IOException { + private boolean preUpdate() throws CorruptIndexException, IOException { ensureOpen(); boolean maybeMerge = false; - final boolean isUpdate = delTerm != null; if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) { // Help out flushing any queued DWPTs so we can un-stall: if (infoStream != null) { @@ -303,9 +301,59 @@ message("continue indexing after helpling out flushing DocumentsWriter is healthy"); } } + return maybeMerge; + } - final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), - this, doc); + private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean maybeMerge) throws IOException { + if (flushingDWPT != null) { + maybeMerge |= doFlush(flushingDWPT); + } else { + final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush(); + if (nextPendingFlush != null) { + maybeMerge |= doFlush(nextPendingFlush); + } + } + + return maybeMerge; + } + + boolean updateDocuments(final Iterable docs, final Analyzer analyzer, + final Term delTerm) throws CorruptIndexException, IOException { + boolean maybeMerge = preUpdate(); + + final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this); + final DocumentsWriterPerThread flushingDWPT; + + try { + if (!perThread.isActive()) { + ensureOpen(); + assert false: "perThread is not active but we are still open"; + } + + final DocumentsWriterPerThread dwpt = perThread.perThread; + try { + final int docCount = dwpt.updateDocuments(docs, analyzer, delTerm); + numDocsInRAM.addAndGet(docCount); + } finally { + if (dwpt.checkAndResetHasAborted()) { + flushControl.doOnAbort(perThread); + } + } + final boolean isUpdate = delTerm != null; + flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); + } finally { + perThread.unlock(); + } + + return postUpdate(flushingDWPT, maybeMerge); + } + + boolean updateDocument(final Document doc, final Analyzer analyzer, + final Term delTerm) throws CorruptIndexException, IOException { + + boolean maybeMerge = preUpdate(); + + final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this); final DocumentsWriterPerThread flushingDWPT; try { @@ -324,20 +372,13 @@ flushControl.doOnAbort(perThread); } } + final boolean isUpdate = delTerm != null; flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); } finally { perThread.unlock(); } - - if (flushingDWPT != null) { - maybeMerge |= doFlush(flushingDWPT); - } else { - final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush(); - if (nextPendingFlush != null) { - maybeMerge |= doFlush(nextPendingFlush); - } - } - return maybeMerge; + + return postUpdate(flushingDWPT, maybeMerge); } private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException { Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java --- lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Fri May 20 05:12:17 2011 -0400 +++ lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Sun May 22 08:30:46 2011 -0400 @@ -68,7 +68,7 @@ this.stallControl = new DocumentsWriterStallControl(); this.perThreadPool = documentsWriter.perThreadPool; this.flushPolicy = documentsWriter.flushPolicy; - this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;; + this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024; this.config = config; this.documentsWriter = documentsWriter; } @@ -162,8 +162,6 @@ stallControl.updateStalled(this); assert assertMemory(); } - - } synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) { @@ -217,7 +215,7 @@ assert assertMemory(); // Take it out of the loop this DWPT is stale perThreadPool.replaceForFlush(state, closed); - }finally { + } finally { stallControl.updateStalled(this); } } Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java --- lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Fri May 20 05:12:17 2011 -0400 +++ lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Sun May 22 08:30:46 2011 -0400 @@ -104,7 +104,7 @@ // largish: doc = null; analyzer = null; - } + } } static class FlushedSegment { @@ -253,6 +253,82 @@ finishDocument(delTerm); } + public int updateDocuments(Iterable docs, Analyzer analyzer, Term delTerm) throws IOException { + assert writer.testPoint("DocumentsWriterPerThread addDocuments start"); + assert deleteQueue != null; + docState.analyzer = analyzer; + if (segment == null) { + // this call is synchronized on IndexWriter.segmentInfos + segment = writer.newSegmentName(); + assert numDocsInRAM == 0; + } + + int docCount = 0; + try { + for(Document doc : docs) { + docState.doc = doc; + docState.docID = numDocsInRAM; + docCount++; + + boolean success = false; + try { + consumer.processDocument(fieldInfos); + success = true; + } finally { + if (!success) { + // An exc is being thrown... + + if (!aborting) { + // One of the documents hit a non-aborting + // exception (eg something happened during + // analysis). We now go and mark any docs + // from this batch that we had already indexed + // as deleted: + int docID = docState.docID; + final int endDocID = docID - docCount; + while (docID > endDocID) { + deleteDocID(docID); + docID--; + } + + // Incr here because finishDocument will not + // be called (because an exc is being thrown): + numDocsInRAM++; + fieldInfos.revertUncommitted(); + } else { + abort(); + } + } + } + success = false; + try { + consumer.finishDocument(); + success = true; + } finally { + if (!success) { + abort(); + } + } + + finishDocument(null); + } + + // Apply delTerm only after all indexing has + // succeeded, but apply it only to docs prior to when + // this batch started: + if (delTerm != null) { + deleteQueue.add(delTerm, deleteSlice); + assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item"; + deleteSlice.apply(pendingDeletes, numDocsInRAM-docCount); + } + + } finally { + docState.clear(); + } + + return docCount; + } + private void finishDocument(Term delTerm) throws IOException { /* * here we actually finish the document in two steps 1. push the delete into Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java --- lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java Fri May 20 05:12:17 2011 -0400 +++ lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java Sun May 22 08:30:46 2011 -0400 @@ -19,7 +19,6 @@ import java.util.Iterator; import java.util.concurrent.locks.ReentrantLock; -import org.apache.lucene.document.Document; import org.apache.lucene.index.FieldInfos.FieldNumberBiMap; import org.apache.lucene.index.SegmentCodecs.SegmentCodecsBuilder; import org.apache.lucene.index.codecs.CodecProvider; @@ -212,7 +211,7 @@ // don't recycle DWPT by default } - public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc); + public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter); /** * Returns an iterator providing access to all {@link ThreadState} Index: lucene/src/java/org/apache/lucene/index/IndexWriter.java --- lucene/src/java/org/apache/lucene/index/IndexWriter.java Fri May 20 05:12:17 2011 -0400 +++ lucene/src/java/org/apache/lucene/index/IndexWriter.java Sun May 22 08:30:46 2011 -0400 @@ -1228,6 +1228,111 @@ } /** + * Adds multiple documents with sequentially + * assigned document IDs, atomically, such that an + * external reader will see all or none of the documents. + * + *

WARNING: the index does not currently record + * which block of documents were added atomically. Today, this is + * fine, because merging will preserve the order of the + * documents (as long as either none or all of them are + * deleted). But it's possible in the future that Lucene + * may more aggressively re-order documents (for example, + * perhaps to obtain better index compression), in which + * case you may need to fully re-index your documents at + * that time. + * + *

See {@link #addDocument(Document)} for details on + * index and IndexWriter state after an Exception, and + * flushing/merging temporary free space requirements.

+ * + *

NOTE: tools that do offline splitting of an index + * (for example, IndexSplitter in contrib) or + * re-sorting of documents (for example, IndexSorter in + * contrib) are not aware of these atomically added documents + * and will likely break them up. Use such tools at your + * own risk! + * + *

NOTE: if this method hits an OutOfMemoryError + * you should immediately close the writer. See above for details.

+ * + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + * + * @lucene.experimental + */ + public void addDocuments(Iterable docs) throws CorruptIndexException, IOException { + addDocuments(docs, analyzer); + } + + /** + * Adds multiple documents, using the provided analyzer, + * with sequentially assigned document IDs, atomically, + * such that an external reader will see all or none of + * the documents. + * + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + * + * @lucene.experimental + */ + public void addDocuments(Iterable docs, Analyzer analyzer) throws CorruptIndexException, IOException { + updateDocuments(null, docs, analyzer); + } + + /** + * Deletes all documents matching the provided + * delTerm and adds multiple documents with sequentially + * assigned document IDs, atomically, such that an + * external reader will see all or none of the documents. + * + * See {@link #addDocuments(Iterable)}. + * + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + * + * @lucene.experimental + */ + public void updateDocuments(Term delTerm, Iterable docs) throws CorruptIndexException, IOException { + updateDocuments(delTerm, docs, analyzer); + } + + /** + * Deletes all documents matching the provided + * delTerm and adds multiple documents, using the provided analyzer, + * with sequentially assigned document IDs, atomically, + * such that an external reader will see all or none of + * the documents. + * See {@link #addDocuments(Iterable)}. + * + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + * + * @lucene.experimental + */ + public void updateDocuments(Term delTerm, Iterable docs, Analyzer analyzer) throws CorruptIndexException, IOException { + ensureOpen(); + try { + boolean success = false; + boolean anySegmentFlushed = false; + try { + anySegmentFlushed = docWriter.updateDocuments(docs, analyzer, delTerm); + success = true; + } finally { + if (!success && infoStream != null) { + message("hit exception updating document"); + } + } + if (anySegmentFlushed) { + maybeMerge(); + } + } catch (OutOfMemoryError oom) { + handleOOM(oom, "updateDocuments"); + } + } + + /** * Deletes the document(s) containing term. * *

NOTE: if this method hits an OutOfMemoryError Index: lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java --- lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java Fri May 20 05:12:17 2011 -0400 +++ lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java Sun May 22 08:30:46 2011 -0400 @@ -139,7 +139,7 @@ } } - assert lastDocID == docState.docID; + assert lastDocID == docState.docID: "lastDocID=" + lastDocID + " docState.docID=" + docState.docID; lastDocID++; Index: lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java --- lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java Fri May 20 05:12:17 2011 -0400 +++ lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java Sun May 22 08:30:46 2011 -0400 @@ -18,7 +18,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.apache.lucene.document.Document; import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; //javadoc /** @@ -48,12 +47,10 @@ } @Override - public ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc) { + public ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) { ThreadState threadState = threadBindings.get(requestingThread); - if (threadState != null) { - if (threadState.tryLock()) { - return threadState; - } + if (threadState != null && threadState.tryLock()) { + return threadState; } ThreadState minThreadState = null; Index: lucene/src/test-framework/org/apache/lucene/index/RandomIndexWriter.java --- lucene/src/test-framework/org/apache/lucene/index/RandomIndexWriter.java Fri May 20 05:12:17 2011 -0400 +++ lucene/src/test-framework/org/apache/lucene/index/RandomIndexWriter.java Sun May 22 08:30:46 2011 -0400 @@ -19,6 +19,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Iterator; import java.util.Random; import org.apache.lucene.analysis.Analyzer; @@ -97,8 +98,43 @@ * Adds a Document. * @see IndexWriter#addDocument(Document) */ - public void addDocument(Document doc) throws IOException { - w.addDocument(doc); + public void addDocument(final Document doc) throws IOException { + if (r.nextInt(5) == 3) { + // TODO: maybe, we should simply buffer up added docs + // (but we need to clone them), and only when + // getReader, commit, etc. are called, we do an + // addDocuments? Would be better testing. + w.addDocuments(new Iterable() { + + // @Override -- not until Java 1.6 + public Iterator iterator() { + return new Iterator() { + boolean done; + + // @Override -- not until Java 1.6 + public boolean hasNext() { + return !done; + } + + // @Override -- not until Java 1.6 + public void remove() { + throw new UnsupportedOperationException(); + } + + // @Override -- not until Java 1.6 + public Document next() { + if (done) { + throw new IllegalStateException(); + } + done = true; + return doc; + } + }; + } + }); + } else { + w.addDocument(doc); + } maybeCommit(); } @@ -116,12 +152,53 @@ } } + public void addDocuments(Iterable docs) throws IOException { + w.addDocuments(docs); + maybeCommit(); + } + + public void updateDocuments(Term delTerm, Iterable docs) throws IOException { + w.updateDocuments(delTerm, docs); + maybeCommit(); + } + /** * Updates a document. * @see IndexWriter#updateDocument(Term, Document) */ - public void updateDocument(Term t, Document doc) throws IOException { - w.updateDocument(t, doc); + public void updateDocument(Term t, final Document doc) throws IOException { + if (r.nextInt(5) == 3) { + w.updateDocuments(t, new Iterable() { + + // @Override -- not until Java 1.6 + public Iterator iterator() { + return new Iterator() { + boolean done; + + // @Override -- not until Java 1.6 + public boolean hasNext() { + return !done; + } + + // @Override -- not until Java 1.6 + public void remove() { + throw new UnsupportedOperationException(); + } + + // @Override -- not until Java 1.6 + public Document next() { + if (done) { + throw new IllegalStateException(); + } + done = true; + return doc; + } + }; + } + }); + } else { + w.updateDocument(t, doc); + } maybeCommit(); } Index: lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java --- lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java Fri May 20 05:12:17 2011 -0400 +++ lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java Sun May 22 08:30:46 2011 -0400 @@ -17,24 +17,16 @@ * limitations under the License. */ -import java.util.ArrayList; -import java.util.List; -import java.util.Random; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintStream; import java.io.Reader; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Random; -import org.apache.lucene.util.Bits; -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.LuceneTestCase; -import org.apache.lucene.util._TestUtil; -import org.apache.lucene.search.DocIdSetIterator; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.IndexOutput; -import org.apache.lucene.store.MockDirectoryWrapper; -import org.apache.lucene.store.RAMDirectory; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.analysis.MockTokenizer; @@ -43,9 +35,54 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.index.IndexWriterConfig.OpenMode; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.PhraseQuery; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.MockDirectoryWrapper; +import org.apache.lucene.store.RAMDirectory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util._TestUtil; public class TestIndexWriterExceptions extends LuceneTestCase { + private static class DocCopyIterator implements Iterable { + private final Document doc; + private final int count; + + public DocCopyIterator(Document doc, int count) { + this.count = count; + this.doc = doc; + } + + // @Override -- not until Java 1.6 + public Iterator iterator() { + return new Iterator() { + int upto; + + // @Override -- not until Java 1.6 + public boolean hasNext() { + return upto < count; + } + + // @Override -- not until Java 1.6 + public Document next() { + upto++; + return doc; + } + + // @Override -- not until Java 1.6 + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + } + private class IndexerThread extends Thread { IndexWriter writer; @@ -87,7 +124,11 @@ idField.setValue(id); Term idTerm = new Term("id", id); try { - writer.updateDocument(idTerm, doc); + if (r.nextBoolean()) { + writer.updateDocuments(idTerm, new DocCopyIterator(doc, _TestUtil.nextInt(r, 1, 20))); + } else { + writer.updateDocument(idTerm, doc); + } } catch (RuntimeException re) { if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": EXC: "); @@ -136,7 +177,7 @@ @Override boolean testPoint(String name) { - if (doFail.get() != null && !name.equals("startDoFlush") && r.nextInt(20) == 17) { + if (doFail.get() != null && !name.equals("startDoFlush") && r.nextInt(40) == 17) { if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": NOW FAIL: " + name); new Throwable().printStackTrace(System.out); @@ -267,6 +308,8 @@ } } + private static String CRASH_FAIL_MESSAGE = "I'm experiencing problems"; + private class CrashingFilter extends TokenFilter { String fieldName; int count; @@ -279,7 +322,7 @@ @Override public boolean incrementToken() throws IOException { if (this.fieldName.equals("crash") && count++ >= 4) - throw new IOException("I'm experiencing problems"); + throw new IOException(CRASH_FAIL_MESSAGE); return input.incrementToken(); } @@ -1278,4 +1321,141 @@ } } } + + public void testAddDocsNonAbortingException() throws Exception { + final Directory dir = newDirectory(); + final RandomIndexWriter w = new RandomIndexWriter(random, dir); + final int numDocs1 = random.nextInt(25); + for(int docCount=0;docCount docs = new ArrayList(); + for(int docCount=0;docCount<7;docCount++) { + Document doc = new Document(); + docs.add(doc); + doc.add(newField("id", docCount+"", Field.Index.NOT_ANALYZED)); + doc.add(newField("content", "silly content " + docCount, Field.Index.ANALYZED)); + if (docCount == 4) { + Field f = newField("crash", "", Field.Index.ANALYZED); + doc.add(f); + MockTokenizer tokenizer = new MockTokenizer(new StringReader("crash me on the 4th token"), MockTokenizer.WHITESPACE, false); + tokenizer.setEnableChecks(false); // disable workflow checking as we forcefully close() in exceptional cases. + f.setTokenStream(new CrashingFilter("crash", tokenizer)); + } + } + try { + w.addDocuments(docs); + // BUG: CrashingFilter didn't + fail("did not hit expected exception"); + } catch (IOException ioe) { + // expected + assertEquals(CRASH_FAIL_MESSAGE, ioe.getMessage()); + } + + final int numDocs2 = random.nextInt(25); + for(int docCount=0;docCount docs = new ArrayList(); + final int numDocs2 = random.nextInt(25); + for(int docCount=0;docCount subIDs; + public boolean deleted; + + public SubDocs(String packID, List subIDs) { + this.packID = packID; + this.subIDs = subIDs; + } + } + + // TODO: is there a pre-existing way to do this!!! + private Document cloneDoc(Document doc1) { + final Document doc2 = new Document(); + for(Fieldable f : doc1.getFields()) { + Field field1 = (Field) f; + + Field field2 = new Field(field1.name(), + field1.stringValue(), + field1.isStored() ? Field.Store.YES : Field.Store.NO, + field1.isIndexed() ? (field1.isTokenized() ? Field.Index.ANALYZED : Field.Index.NOT_ANALYZED) : Field.Index.NO); + if (field1.getOmitNorms()) { + field2.setOmitNorms(true); + } + if (field1.getOmitTermFreqAndPositions()) { + field2.setOmitTermFreqAndPositions(true); + } + doc2.add(field2); + } + + return doc2; + } + @Test public void testNRTThreads() throws Exception { @@ -106,13 +141,16 @@ final int NUM_INDEX_THREADS = 2; final int NUM_SEARCH_THREADS = 3; + final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : 5; final AtomicBoolean failed = new AtomicBoolean(); final AtomicInteger addCount = new AtomicInteger(); final AtomicInteger delCount = new AtomicInteger(); + final AtomicInteger packCount = new AtomicInteger(); final Set delIDs = Collections.synchronizedSet(new HashSet()); + final List allSubDocs = Collections.synchronizedList(new ArrayList()); final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000; Thread[] threads = new Thread[NUM_INDEX_THREADS]; @@ -120,7 +158,9 @@ threads[thread] = new Thread() { @Override public void run() { + // TODO: would be better if this were cross thread, so that we make sure one thread deleting anothers added docs works: final List toDeleteIDs = new ArrayList(); + final List toDeleteSubDocs = new ArrayList(); while(System.currentTimeMillis() < stopTime && !failed.get()) { try { Document doc = docs.nextDoc(); @@ -138,7 +178,92 @@ if (VERBOSE) { //System.out.println(Thread.currentThread().getName() + ": add doc id:" + doc.get("docid")); } - writer.addDocument(doc); + + if (random.nextBoolean()) { + // Add a pack of adjacent sub-docs + final String packID; + final SubDocs delSubDocs; + if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) { + delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size())); + assert !delSubDocs.deleted; + toDeleteSubDocs.remove(delSubDocs); + // reuse prior packID + packID = delSubDocs.packID; + } else { + delSubDocs = null; + // make new packID + packID = packCount.getAndIncrement() + ""; + } + + final Field packIDField = newField("packID", packID, Field.Store.YES, Field.Index.NOT_ANALYZED); + final List docIDs = new ArrayList(); + final SubDocs subDocs = new SubDocs(packID, docIDs); + final List docsList = new ArrayList(); + + allSubDocs.add(subDocs); + doc.add(packIDField); + docsList.add(cloneDoc(doc)); + docIDs.add(doc.get("docid")); + + final int maxDocCount = _TestUtil.nextInt(random, 1, 10); + while(docsList.size() < maxDocCount) { + doc = docs.nextDoc(); + if (doc == null) { + break; + } + docsList.add(cloneDoc(doc)); + docIDs.add(doc.get("docid")); + } + addCount.addAndGet(docsList.size()); + + if (delSubDocs != null) { + delSubDocs.deleted = true; + delIDs.addAll(delSubDocs.subIDs); + delCount.addAndGet(delSubDocs.subIDs.size()); + if (VERBOSE) { + System.out.println("TEST: update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs); + } + writer.updateDocuments(new Term("packID", delSubDocs.packID), docsList); + /* + // non-atomic: + writer.deleteDocuments(new Term("packID", delSubDocs.packID)); + for(Document subDoc : docsList) { + writer.addDocument(subDoc); + } + */ + } else { + if (VERBOSE) { + System.out.println("TEST: add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs); + } + writer.addDocuments(docsList); + + /* + // non-atomic: + for(Document subDoc : docsList) { + writer.addDocument(subDoc); + } + */ + } + doc.removeField("packID"); + + if (random.nextInt(5) == 2) { + if (VERBOSE) { + //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID); + } + toDeleteSubDocs.add(subDocs); + } + + } else { + writer.addDocument(doc); + addCount.getAndIncrement(); + + if (random.nextInt(5) == 3) { + if (VERBOSE) { + //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid")); + } + toDeleteIDs.add(doc.get("docid")); + } + } } else { // we use update but it never replaces a // prior doc @@ -146,14 +271,17 @@ //System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid")); } writer.updateDocument(new Term("docid", doc.get("docid")), doc); + addCount.getAndIncrement(); + + if (random.nextInt(5) == 3) { + if (VERBOSE) { + //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid")); + } + toDeleteIDs.add(doc.get("docid")); + } } - if (random.nextInt(5) == 3) { - if (VERBOSE) { - //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid")); - } - toDeleteIDs.add(doc.get("docid")); - } - if (random.nextInt(50) == 17) { + + if (random.nextInt(30) == 17) { if (VERBOSE) { //System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes"); } @@ -169,8 +297,19 @@ } delIDs.addAll(toDeleteIDs); toDeleteIDs.clear(); + + for(SubDocs subDocs : toDeleteSubDocs) { + assert !subDocs.deleted; + writer.deleteDocuments(new Term("packID", subDocs.packID)); + subDocs.deleted = true; + if (VERBOSE) { + System.out.println(" del subs: " + subDocs.subIDs + " packID=" + subDocs.packID); + } + delIDs.addAll(subDocs.subIDs); + delCount.addAndGet(subDocs.subIDs.size()); + } + toDeleteSubDocs.clear(); } - addCount.getAndIncrement(); if (addedField != null) { doc.removeField(addedField); } @@ -341,7 +480,7 @@ if (VERBOSE) { System.out.println("TEST: done join [" + (System.currentTimeMillis()-t0) + " ms]; addCount=" + addCount + " delCount=" + delCount); } - + final IndexReader r2 = writer.getReader(); final IndexSearcher s = newSearcher(r2); boolean doFail = false; @@ -352,6 +491,43 @@ doFail = true; } } + + // Make sure each group of sub-docs are still in docID order: + for(SubDocs subDocs : allSubDocs) { + if (!subDocs.deleted) { + // We sort by relevance but the scores should be identical so sort falls back to by docID: + TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20); + assertEquals(subDocs.subIDs.size(), hits.totalHits); + int lastDocID = -1; + int startDocID = -1; + for(ScoreDoc scoreDoc : hits.scoreDocs) { + final int docID = scoreDoc.doc; + if (lastDocID != -1) { + assertEquals(1+lastDocID, docID); + } else { + startDocID = docID; + } + lastDocID = docID; + final Document doc = s.doc(docID); + assertEquals(subDocs.packID, doc.get("packID")); + } + + lastDocID = startDocID - 1; + for(String subID : subDocs.subIDs) { + hits = s.search(new TermQuery(new Term("docid", subID)), 1); + assertEquals(1, hits.totalHits); + final int docID = hits.scoreDocs[0].doc; + if (lastDocID != -1) { + assertEquals(1+lastDocID, docID); + } + lastDocID = docID; + } + } else { + for(String subID : subDocs.subIDs) { + assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits); + } + } + } final int endID = Integer.parseInt(docs.nextDoc().get("docid")); for(int id=0;idThis results in better performance than the + * two-pass grouping collectors, with the tradeoff being + * that the documents must always be indexed as a block. + * + *

NOTE: this collector makes no effort to verify + * the docs were in fact indexed as a block, so it's up to + * you to ensure this was the case. + * + *

See {@link org.apache.lucene.search.grouping} for more + * details including a full code example.

+ * + * @lucene.experimental + */ + +public class DocBlockGroupingCollector extends Collector { + + private int[] pendingSubDocs; + private float[] pendingSubScores; + private int subDocUpto; + + private final String groupField; + private final Sort groupSort; + private final Sort withinGroupSort; + private final int topNGroups; + private final int maxDocsPerGroup; + // TODO: specialize into 2 classes, static "create" method: + private final boolean needsScores; + + private final FieldComparator[] comparators; + private final int[] reversed; + private final int compIDXEnd; + private int bottomSlot; + private boolean queueFull; + private AtomicReaderContext currentReaderContext; + + private int topGroupDoc; + private int totalHitCount; + private int docBase; + private int lastGroupOrd = -1; + private FieldCache.DocTermsIndex index; + private Scorer scorer; + private final GroupQueue groupQueue; + private boolean groupCompetes; + + private final static class FakeScorer extends Scorer { + + float score; + int doc; + + public FakeScorer() { + super((Weight) null); + } + + @Override + public float score() { + return score; + } + + @Override + public int docID() { + return doc; + } + + @Override + public int advance(int target) { + throw new UnsupportedOperationException(); + } + + @Override + public int nextDoc() { + throw new UnsupportedOperationException(); + } + } + + private static final class OneGroup { + AtomicReaderContext readerContext; + int groupOrd; + int topGroupDoc; + int[] docs; + float[] scores; + int count; + int comparatorSlot; + } + + // Sorts by groupSort. Not static -- uses comparators, reversed + private final class GroupQueue extends PriorityQueue { + + public GroupQueue(int size) { + super(size); + } + + @Override + protected boolean lessThan(final OneGroup group1, final OneGroup group2) { + + //System.out.println(" ltcheck"); + assert group1 != group2; + assert group1.comparatorSlot != group2.comparatorSlot; + + final int numComparators = comparators.length; + for (int compIDX=0;compIDX < numComparators; compIDX++) { + final int c = reversed[compIDX] * comparators[compIDX].compare(group1.comparatorSlot, group2.comparatorSlot); + if (c != 0) { + // Short circuit + return c > 0; + } + } + + // Break ties by docID; lower docID is always sorted first + return group1.topGroupDoc > group2.topGroupDoc; + } + } + + /* + private String getGroupString(int groupOrd) { + if (groupOrd == 0) { + return null; + } else { + return index.lookup(groupOrd, new BytesRef()).utf8ToString(); + } + } + */ + + // Called when we transition to another group; if the + // group is competitive we insert into the group queue + private void processGroup() { + //System.out.println(" processGroup ord=" + lastGroupOrd + " competes=" + groupCompetes + " count=" + subDocUpto + " groupDoc=" + topGroupDoc); + if (groupCompetes) { + if (!queueFull) { + // Startup transient: always add a new OneGroup + final OneGroup og = new OneGroup(); + og.count = subDocUpto; + og.topGroupDoc = docBase + topGroupDoc; + og.docs = new int[subDocUpto]; + System.arraycopy(pendingSubDocs, 0, og.docs, 0, subDocUpto); + if (needsScores) { + og.scores = new float[subDocUpto]; + System.arraycopy(pendingSubScores, 0, og.scores, 0, subDocUpto); + } + og.readerContext = currentReaderContext; + og.groupOrd = lastGroupOrd; + og.comparatorSlot = bottomSlot; + final OneGroup bottomGroup = groupQueue.add(og); + //System.out.println(" ADD group=" + getGroupString(lastGroupOrd) + " newBottom=" + getGroupString(bottomGroup.groupOrd)); + queueFull = groupQueue.size() == topNGroups; + if (queueFull) { + // Queue just became full; now set the real bottom + // in the comparators: + bottomSlot = bottomGroup.comparatorSlot; + //System.out.println(" set bottom=" + bottomSlot); + for (int i = 0; i < comparators.length; i++) { + comparators[i].setBottom(bottomSlot); + } + //System.out.println(" QUEUE FULL"); + } else { + // Queue not full yet -- just advance bottomSlot: + bottomSlot = groupQueue.size(); + } + } else { + // Replace bottom element in PQ and then updateTop + final OneGroup og = groupQueue.top(); + assert og != null; + og.count = subDocUpto; + og.topGroupDoc = docBase + topGroupDoc; + if (og.docs.length < subDocUpto) { + og.docs = ArrayUtil.grow(og.docs, subDocUpto); + } + System.arraycopy(pendingSubDocs, 0, og.docs, 0, subDocUpto); + if (needsScores) { + if (og.scores.length < subDocUpto) { + og.scores = ArrayUtil.grow(og.scores, subDocUpto); + } + System.arraycopy(pendingSubScores, 0, og.scores, 0, subDocUpto); + } + og.readerContext = currentReaderContext; + og.groupOrd = lastGroupOrd; + bottomSlot = groupQueue.updateTop().comparatorSlot; + + //System.out.println(" set bottom=" + bottomSlot); + for (int i = 0; i < comparators.length; i++) { + comparators[i].setBottom(bottomSlot); + } + } + } + subDocUpto = 0; + } + + /** + * Create the single pass collector. + * + * @param groupField The field used to group + * documents. This field must be single-valued and + * indexed (FieldCache is used to access its value + * per-document). + * @param groupSort The {@link Sort} used to sort the + * groups. The top sorted document within each group + * according to groupSort, determines how that group + * sorts against other groups. This must be non-null, + * ie, if you want to groupSort by relevance use + * Sort.RELEVANCE. + * @param topNGroups How many top groups to keep. + */ + public DocBlockGroupingCollector(String groupField, Sort groupSort, int topNGroups, Sort withinGroupSort, int maxDocsPerGroup, boolean needsScores) throws IOException { + + if (topNGroups < 1) { + throw new IllegalArgumentException("topNGroups must be >= 1 (got " + topNGroups + ")"); + } + + groupQueue = new GroupQueue(topNGroups); + pendingSubDocs = new int[10]; + if (needsScores) { + pendingSubScores = new float[10]; + } + + this.maxDocsPerGroup = maxDocsPerGroup; + this.withinGroupSort = withinGroupSort; + this.groupField = groupField; + this.needsScores = needsScores; + // TODO: allow null groupSort to mean "by relevance", + // and specialize it? + this.groupSort = groupSort; + + this.topNGroups = topNGroups; + + final SortField[] sortFields = groupSort.getSort(); + comparators = new FieldComparator[sortFields.length]; + compIDXEnd = comparators.length - 1; + reversed = new int[sortFields.length]; + for (int i = 0; i < sortFields.length; i++) { + final SortField sortField = sortFields[i]; + comparators[i] = sortField.getComparator(topNGroups, i); + reversed[i] = sortField.getReverse() ? -1 : 1; + } + } + + // TODO: maybe allow no sort on retrieving groups? app + // may want to simply process docs in the group itself? + // typically they will be presented as a "single" result + // in the UI? + + /** Returns the grouped results. Returns null if the + * number of groups collected is <= groupOffset. */ + public TopGroups getTopGroups(int groupOffset, int withinGroupOffset, boolean fillGroupValues, boolean fillSortFields, boolean getScores, boolean getMaxScores) throws IOException { + //if (queueFull) { + //System.out.println("getTopGroups groupOffset=" + groupOffset + " topNGroups=" + topNGroups); + //} + if (subDocUpto != 0) { + processGroup(); + lastGroupOrd = -1; + } + if (groupOffset >= groupQueue.size()) { + return null; + } + int totalGroupedHitCount = 0; + + final FakeScorer fakeScorer = new FakeScorer(); + + final GroupDocs[] groups = new GroupDocs[groupQueue.size() - groupOffset]; + for(int downTo=groupQueue.size()-groupOffset-1;downTo>=0;downTo--) { + final OneGroup og = groupQueue.pop(); + + // At this point we hold all docs w/ in each group, + // unsorted; we now sort them: + final TopDocsCollector collector; + if (withinGroupSort == null) { + // Sort by score + collector = TopScoreDocCollector.create(maxDocsPerGroup, true); + } else { + // Sort by fields + collector = TopFieldCollector.create(withinGroupSort, maxDocsPerGroup, fillSortFields, getScores, getMaxScores, true); + } + + collector.setScorer(fakeScorer); + collector.setNextReader(og.readerContext); + for(int docIDX=0;docIDX value now + groupValue = og.groupOrd == 0 ? null : FieldCache.DEFAULT.getTermsIndex(og.readerContext.reader, groupField).lookup(og.groupOrd, new BytesRef()); + } else { + groupValue = null; + } + //System.out.println(" group=" + (groupValue == null ? "null" : groupValue.utf8ToString()) + " ord=" + og.groupOrd + " count=" + og.count); + + final Comparable[] groupSortValues; + + if (fillSortFields) { + groupSortValues = new Comparable[comparators.length]; + for(int sortFieldIDX=0;sortFieldIDX 0) { + // Definitely competitive. + break; + } else if (compIDX == compIDXEnd) { + // Ties with bottom, except we know this docID is + // > docID in the queue (docs are visited in + // order), so not competitive: + //System.out.println(" doc doesn't compete w/ top groups"); + return; + } + } + groupCompetes = true; + for (FieldComparator fc : comparators) { + fc.copy(bottomSlot, doc); + // Necessary because some comparators cache + // details of bottom slot; this forces them to + // re-cache: + fc.setBottom(bottomSlot); + } + topGroupDoc = doc; + //System.out.println(" doc competes w/ top groups"); + } + } + + @Override + public boolean acceptsDocsOutOfOrder() { + return false; + } + + @Override + public void setNextReader(AtomicReaderContext readerContext) throws IOException { + if (subDocUpto != 0) { + processGroup(); + } + lastGroupOrd = -1; + subDocUpto = 0; + docBase = readerContext.docBase; + //System.out.println("setNextReader base=" + docBase + " r=" + readerContext.reader); + index = FieldCache.DEFAULT.getTermsIndex(readerContext.reader, groupField); + currentReaderContext = readerContext; + for (int i=0; i -

The implementation is two-pass: the first pass ({@link - org.apache.lucene.search.grouping.FirstPassGroupingCollector}) - gathers the top groups, and the second pass ({@link - org.apache.lucene.search.grouping.SecondPassGroupingCollector}) - gathers documents within those groups. If the search is costly to - run you may want to use the {@link - org.apache.lucene.search.CachingCollector} class, which - caches hits and can (quickly) replay them for the second pass. This - way you only run the query once, but you pay a RAM cost to (briefly) - hold all hits. Results are returned as a {@link - org.apache.lucene.search.grouping.TopGroups} instance.

+

+There are two grouping implementations here: +

    +
  • + Generic grouping that can grouped by any single-valued indexed + field, implemented as a two-pass collector: the first pass ({@link + org.apache.lucene.search.grouping.FirstPassGroupingCollector}) + gathers the top groups, and the second pass ({@link + org.apache.lucene.search.grouping.SecondPassGroupingCollector}) + gathers documents within those groups. If the search is costly to + run you may want to use the {@link + org.apache.lucene.search.CachingCollector} class, which caches + hits and can (quickly) replay them for the second pass. This way + you only run the query once, but you pay a RAM cost to (briefly) + hold all hits. Results are returned as a {@link + org.apache.lucene.search.grouping.TopGroups} instance.

    +
  • +
  • + Doc block grouping collector: this is a single pass collector that + is able to group by a field as long as all documents within each + group were indexed as a doc block using IndexWriter's + add/updateDocuments API. This is faster than the generic two-pass + collector, but only works for doc blocks. +
  • +

    Known limitations:

      @@ -66,7 +80,7 @@ group yourself.
    -

    Typical usage looks like this (using the {@link org.apache.lucene.search.CachingCollector}):

    +

    Typical usage for the generic two-pass collector looks like this (using the {@link org.apache.lucene.search.CachingCollector}):

       FirstPassGroupingCollector c1 = new FirstPassGroupingCollector("author", groupSort, groupOffset+topNGroups);
    @@ -111,5 +125,15 @@
       // Render groupsResult...
     
    +

    Typical usage for the doc block single-pass collector looks like this:

    + +
    +  DocBlockGroupingCollector c = new DocBlockGroupingCollector("author", groupSort, groupOffset+topNGroups, withinGroupSort, docOffset+docsPerGroup, needsScores);
    +  s.search(new TermQuery(new Term("content", searchTerm)), c);
    +  TopGroups groupsResult = c.getTopGroups(groupOffset, docOffset, fillGroupValues, fillFields, getScores, getMaxScores:
    +
    +  // Render groupsResult...
    +
    + Index: modules/grouping/src/test/org/apache/lucene/search/grouping/TestGrouping.java --- modules/grouping/src/test/org/apache/lucene/search/grouping/TestGrouping.java Fri May 20 05:12:17 2011 -0400 +++ modules/grouping/src/test/org/apache/lucene/search/grouping/TestGrouping.java Sun May 22 08:30:46 2011 -0400 @@ -18,6 +18,7 @@ package org.apache.lucene.search.grouping; import java.util.*; +import java.io.IOException; import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.Document; @@ -183,6 +184,7 @@ private Comparator getComparator(Sort sort) { final SortField[] sortFields = sort.getSort(); return new Comparator() { + // @Override -- Not until Java 1.6 public int compare(GroupDoc d1, GroupDoc d2) { for(SortField sf : sortFields) { final int cmp; @@ -224,6 +226,16 @@ return fields; } + /* + private String groupToString(BytesRef b) { + if (b == null) { + return "null"; + } else { + return b.utf8ToString(); + } + } + */ + private TopGroups slowGrouping(GroupDoc[] groupDocs, String searchTerm, boolean fillFields, @@ -247,21 +259,25 @@ int totalHitCount = 0; Set knownGroups = new HashSet(); + //System.out.println("TEST: slowGrouping"); for(GroupDoc d : groupDocs) { // TODO: would be better to filter by searchTerm before sorting! if (!d.content.equals(searchTerm)) { continue; } totalHitCount++; + //System.out.println(" match id=" + d.id); if (doAllGroups) { if (!knownGroups.contains(d.group)) { knownGroups.add(d.group); + //System.out.println(" add group=" + groupToString(d.group)); } } List l = groups.get(d.group); if (l == null) { + //System.out.println(" add sortedGroup=" + groupToString(d.group)); sortedGroups.add(d.group); if (fillFields) { sortedGroupFields.add(fillFields(d, groupSort)); @@ -322,6 +338,62 @@ } } + private IndexReader getDocBlockReader(Directory dir, GroupDoc[] groupDocs) throws IOException { + // Coalesce by group, but in random order: + Collections.shuffle(Arrays.asList(groupDocs), random); + final Map> groupMap = new HashMap>(); + final List groupValues = new ArrayList(); + + for(GroupDoc groupDoc : groupDocs) { + if (!groupMap.containsKey(groupDoc.group)) { + groupValues.add(groupDoc.group); + groupMap.put(groupDoc.group, new ArrayList()); + } + groupMap.get(groupDoc.group).add(groupDoc); + } + + RandomIndexWriter w = new RandomIndexWriter( + random, + dir, + newIndexWriterConfig(TEST_VERSION_CURRENT, + new MockAnalyzer(random))); + + final List> updateDocs = new ArrayList>(); + //System.out.println("TEST: index groups"); + for(BytesRef group : groupValues) { + final List docs = new ArrayList(); + //System.out.println("TEST: group=" + (group == null ? "null" : group.utf8ToString())); + for(GroupDoc groupValue : groupMap.get(group)) { + Document doc = new Document(); + docs.add(doc); + if (groupValue.group != null) { + doc.add(newField("group", groupValue.group.utf8ToString(), Field.Index.NOT_ANALYZED)); + } + doc.add(newField("sort1", groupValue.sort1.utf8ToString(), Field.Index.NOT_ANALYZED)); + doc.add(newField("sort2", groupValue.sort2.utf8ToString(), Field.Index.NOT_ANALYZED)); + doc.add(new NumericField("id").setIntValue(groupValue.id)); + doc.add(newField("content", groupValue.content, Field.Index.NOT_ANALYZED)); + //System.out.println("TEST: doc content=" + groupValue.content + " group=" + (groupValue.group == null ? "null" : groupValue.group.utf8ToString()) + " sort1=" + groupValue.sort1.utf8ToString() + " id=" + groupValue.id); + } + // Add as a doc block: + w.addDocuments(docs); + if (group != null && random.nextInt(7) == 4) { + updateDocs.add(docs); + } + } + + for(List docs : updateDocs) { + // Just replaces docs w/ same docs: + w.updateDocuments(new Term("group", docs.get(0).get("group")), + docs); + } + + final IndexReader r = w.getReader(); + w.close(); + + return r; + } + public void testRandom() throws Exception { for(int iter=0;iter<3;iter++) { @@ -350,7 +422,7 @@ random, dir, newIndexWriterConfig(TEST_VERSION_CURRENT, - new MockAnalyzer(random)).setMergePolicy(newLogMergePolicy())); + new MockAnalyzer(random))); Document doc = new Document(); Document docNoGroup = new Document(); @@ -405,148 +477,197 @@ final IndexReader r = w.getReader(); w.close(); + // Build 2nd index, where docs are added in blocks by + // group, so we can use single pass collector + final Directory dir2 = newDirectory(); + final IndexReader r2 = getDocBlockReader(dir2, groupDocs); + final IndexSearcher s = new IndexSearcher(r); + final IndexSearcher s2 = new IndexSearcher(r2); - for(int searchIter=0;searchIter<100;searchIter++) { + final int[] docIDToID = FieldCache.DEFAULT.getInts(r, "id"); + final int[] docIDToID2 = FieldCache.DEFAULT.getInts(r2, "id"); - if (VERBOSE) { - System.out.println("TEST: searchIter=" + searchIter); - } + try { + for(int searchIter=0;searchIter<100;searchIter++) { - final String searchTerm = contentStrings[random.nextInt(contentStrings.length)]; - final boolean fillFields = random.nextBoolean(); - final boolean getScores = random.nextBoolean(); - final boolean getMaxScores = random.nextBoolean(); - final Sort groupSort = getRandomSort(); - // TODO: also test null (= sort by relevance) - final Sort docSort = getRandomSort(); - - final int topNGroups = _TestUtil.nextInt(random, 1, 30); - final int docsPerGroup = _TestUtil.nextInt(random, 1, 50); - final int groupOffset = _TestUtil.nextInt(random, 0, (topNGroups-1)/2); - //final int groupOffset = 0; - - final int docOffset = _TestUtil.nextInt(random, 0, docsPerGroup-1); - //final int docOffset = 0; - - final boolean doCache = random.nextBoolean(); - final boolean doAllGroups = random.nextBoolean(); - if (VERBOSE) { - System.out.println("TEST: groupSort=" + groupSort + " docSort=" + docSort + " searchTerm=" + searchTerm + " topNGroups=" + topNGroups + " groupOffset=" + groupOffset + " docOffset=" + docOffset + " doCache=" + doCache + " docsPerGroup=" + docsPerGroup + " doAllGroups=" + doAllGroups); - } - - final AllGroupsCollector allGroupsCollector; - if (doAllGroups) { - allGroupsCollector = new AllGroupsCollector("group"); - } else { - allGroupsCollector = null; - } - - final FirstPassGroupingCollector c1 = new FirstPassGroupingCollector("group", groupSort, groupOffset+topNGroups); - final CachingCollector cCache; - final Collector c; - - final boolean useWrappingCollector = random.nextBoolean(); - - if (doCache) { - final double maxCacheMB = random.nextDouble(); if (VERBOSE) { - System.out.println("TEST: maxCacheMB=" + maxCacheMB); + System.out.println("TEST: searchIter=" + searchIter); } - if (useWrappingCollector) { - if (doAllGroups) { - cCache = CachingCollector.create(c1, true, maxCacheMB); - c = MultiCollector.wrap(cCache, allGroupsCollector); + final String searchTerm = contentStrings[random.nextInt(contentStrings.length)]; + final boolean fillFields = random.nextBoolean(); + final boolean getScores = random.nextBoolean(); + final boolean getMaxScores = random.nextBoolean(); + final Sort groupSort = getRandomSort(); + //final Sort groupSort = new Sort(new SortField[] {new SortField("sort1", SortField.STRING), new SortField("id", SortField.INT)}); + // TODO: also test null (= sort by relevance) + final Sort docSort = getRandomSort(); + + final int topNGroups = _TestUtil.nextInt(random, 1, 30); + //final int topNGroups = 4; + final int docsPerGroup = _TestUtil.nextInt(random, 1, 50); + final int groupOffset = _TestUtil.nextInt(random, 0, (topNGroups-1)/2); + //final int groupOffset = 0; + + final int docOffset = _TestUtil.nextInt(random, 0, docsPerGroup-1); + //final int docOffset = 0; + + final boolean doCache = random.nextBoolean(); + final boolean doAllGroups = random.nextBoolean(); + if (VERBOSE) { + System.out.println("TEST: groupSort=" + groupSort + " docSort=" + docSort + " searchTerm=" + searchTerm + " topNGroups=" + topNGroups + " groupOffset=" + groupOffset + " docOffset=" + docOffset + " doCache=" + doCache + " docsPerGroup=" + docsPerGroup + " doAllGroups=" + doAllGroups); + } + + final AllGroupsCollector allGroupsCollector; + if (doAllGroups) { + allGroupsCollector = new AllGroupsCollector("group"); + } else { + allGroupsCollector = null; + } + + final FirstPassGroupingCollector c1 = new FirstPassGroupingCollector("group", groupSort, groupOffset+topNGroups); + final CachingCollector cCache; + final Collector c; + + final boolean useWrappingCollector = random.nextBoolean(); + + if (doCache) { + final double maxCacheMB = random.nextDouble(); + if (VERBOSE) { + System.out.println("TEST: maxCacheMB=" + maxCacheMB); + } + + if (useWrappingCollector) { + if (doAllGroups) { + cCache = CachingCollector.create(c1, true, maxCacheMB); + c = MultiCollector.wrap(cCache, allGroupsCollector); + } else { + c = cCache = CachingCollector.create(c1, true, maxCacheMB); + } } else { - c = cCache = CachingCollector.create(c1, true, maxCacheMB); + // Collect only into cache, then replay multiple times: + c = cCache = CachingCollector.create(false, true, maxCacheMB); } } else { - // Collect only into cache, then replay multiple times: - c = cCache = CachingCollector.create(false, true, maxCacheMB); - } - } else { - cCache = null; - if (doAllGroups) { - c = MultiCollector.wrap(c1, allGroupsCollector); - } else { - c = c1; - } - } - - s.search(new TermQuery(new Term("content", searchTerm)), c); - - if (doCache && !useWrappingCollector) { - if (cCache.isCached()) { - // Replay for first-pass grouping - cCache.replay(c1); + cCache = null; if (doAllGroups) { - // Replay for all groups: - cCache.replay(allGroupsCollector); - } - } else { - // Replay by re-running search: - s.search(new TermQuery(new Term("content", searchTerm)), c1); - if (doAllGroups) { - s.search(new TermQuery(new Term("content", searchTerm)), allGroupsCollector); + c = MultiCollector.wrap(c1, allGroupsCollector); + } else { + c = c1; } } - } + + s.search(new TermQuery(new Term("content", searchTerm)), c); - final Collection topGroups = c1.getTopGroups(groupOffset, fillFields); - final TopGroups groupsResult; - - if (topGroups != null) { - - if (VERBOSE) { - System.out.println("TEST: topGroups"); - for (SearchGroup searchGroup : topGroups) { - System.out.println(" " + (searchGroup.groupValue == null ? "null" : searchGroup.groupValue.utf8ToString()) + ": " + Arrays.deepToString(searchGroup.sortValues)); + if (doCache && !useWrappingCollector) { + if (cCache.isCached()) { + // Replay for first-pass grouping + cCache.replay(c1); + if (doAllGroups) { + // Replay for all groups: + cCache.replay(allGroupsCollector); + } + } else { + // Replay by re-running search: + s.search(new TermQuery(new Term("content", searchTerm)), c1); + if (doAllGroups) { + s.search(new TermQuery(new Term("content", searchTerm)), allGroupsCollector); + } } } - final SecondPassGroupingCollector c2 = new SecondPassGroupingCollector("group", topGroups, groupSort, docSort, docOffset+docsPerGroup, getScores, getMaxScores, fillFields); - if (doCache) { - if (cCache.isCached()) { - if (VERBOSE) { - System.out.println("TEST: cache is intact"); + final Collection topGroups = c1.getTopGroups(groupOffset, fillFields); + final TopGroups groupsResult; + + if (topGroups != null) { + + if (VERBOSE) { + System.out.println("TEST: topGroups"); + for (SearchGroup searchGroup : topGroups) { + System.out.println(" " + (searchGroup.groupValue == null ? "null" : searchGroup.groupValue.utf8ToString()) + ": " + Arrays.deepToString(searchGroup.sortValues)); } - cCache.replay(c2); + } + + final SecondPassGroupingCollector c2 = new SecondPassGroupingCollector("group", topGroups, groupSort, docSort, docOffset+docsPerGroup, getScores, getMaxScores, fillFields); + if (doCache) { + if (cCache.isCached()) { + if (VERBOSE) { + System.out.println("TEST: cache is intact"); + } + cCache.replay(c2); + } else { + if (VERBOSE) { + System.out.println("TEST: cache was too large"); + } + s.search(new TermQuery(new Term("content", searchTerm)), c2); + } } else { - if (VERBOSE) { - System.out.println("TEST: cache was too large"); - } s.search(new TermQuery(new Term("content", searchTerm)), c2); } + + if (doAllGroups) { + TopGroups tempTopGroups = c2.getTopGroups(docOffset); + groupsResult = new TopGroups(tempTopGroups, allGroupsCollector.getGroupCount()); + } else { + groupsResult = c2.getTopGroups(docOffset); + } } else { - s.search(new TermQuery(new Term("content", searchTerm)), c2); + groupsResult = null; + if (VERBOSE) { + System.out.println("TEST: no results"); + } } + final TopGroups expectedGroups = slowGrouping(groupDocs, searchTerm, fillFields, getScores, getMaxScores, doAllGroups, groupSort, docSort, topNGroups, docsPerGroup, groupOffset, docOffset); + + if (VERBOSE) { + if (expectedGroups == null) { + System.out.println("TEST: no expected groups"); + } else { + System.out.println("TEST: expected groups"); + for(GroupDocs gd : expectedGroups.groups) { + System.out.println(" group=" + (gd.groupValue == null ? "null" : gd.groupValue.utf8ToString())); + for(ScoreDoc sd : gd.scoreDocs) { + System.out.println(" id=" + sd.doc); + } + } + } + } + // NOTE: intentional but temporary field cache insanity! + assertEquals(docIDToID, expectedGroups, groupsResult); + + final boolean needsScores = getScores || getMaxScores || docSort == null; + final DocBlockGroupingCollector c3 = new DocBlockGroupingCollector("group", groupSort, groupOffset+topNGroups, docSort, docOffset+docsPerGroup, needsScores); + final AllGroupsCollector allGroupsCollector2; + final Collector c4; if (doAllGroups) { - TopGroups tempTopGroups = c2.getTopGroups(docOffset); - groupsResult = new TopGroups(tempTopGroups, allGroupsCollector.getGroupCount()); + allGroupsCollector2 = new AllGroupsCollector("group"); + c4 = MultiCollector.wrap(c3, allGroupsCollector2); } else { - groupsResult = c2.getTopGroups(docOffset); + allGroupsCollector2 = null; + c4 = c3; } - } else { - groupsResult = null; - if (VERBOSE) { - System.out.println("TEST: no results"); + s2.search(new TermQuery(new Term("content", searchTerm)), c4); + final TopGroups tempTopGroups2 = c3.getTopGroups(groupOffset, docOffset, true, fillFields, getScores, getMaxScores); + final TopGroups groupsResult2; + if (doAllGroups && tempTopGroups2 != null) { + groupsResult2 = new TopGroups(tempTopGroups2, allGroupsCollector2.getGroupCount()); + } else { + groupsResult2 = tempTopGroups2; } + assertEquals(docIDToID2, expectedGroups, groupsResult2); } - - final TopGroups expectedGroups = slowGrouping(groupDocs, searchTerm, fillFields, getScores, getMaxScores, doAllGroups, groupSort, docSort, topNGroups, docsPerGroup, groupOffset, docOffset); - - try { - // NOTE: intentional but temporary field cache insanity! - assertEquals(FieldCache.DEFAULT.getInts(r, "id"), expectedGroups, groupsResult); - } finally { - FieldCache.DEFAULT.purge(r); - } + } finally { + FieldCache.DEFAULT.purge(r); + FieldCache.DEFAULT.purge(r2); } r.close(); dir.close(); + + r2.close(); + dir2.close(); } }