Index: lucene/src/java/org/apache/lucene/index/DocumentsWriter.java --- lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Tue May 17 05:48:17 2011 -0400 +++ lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Tue May 17 06:50:54 2011 -0400 @@ -274,11 +274,9 @@ flushControl.setClosed(); } - boolean updateDocument(final Document doc, final Analyzer analyzer, - final Term delTerm) throws CorruptIndexException, IOException { + private boolean preUpdate() throws CorruptIndexException, IOException { ensureOpen(); boolean maybeMerge = false; - final boolean isUpdate = delTerm != null; if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) { // Help out flushing any queued DWPTs so we can un-stall: if (infoStream != null) { @@ -303,9 +301,64 @@ message("continue indexing after helpling out flushing DocumentsWriter is healthy"); } } + return maybeMerge; + } - final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), - this, doc); + private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean maybeMerge) throws IOException { + if (flushingDWPT != null) { + maybeMerge |= doFlush(flushingDWPT); + } else { + final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush(); + if (nextPendingFlush != null) { + maybeMerge |= doFlush(nextPendingFlush); + } + } + + return maybeMerge; + } + + boolean updateDocuments(final Iterable docs, final Analyzer analyzer, + final Term delTerm) throws CorruptIndexException, IOException { + boolean maybeMerge = preUpdate(); + + final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this); + final DocumentsWriterPerThread flushingDWPT; + + try { + + if (!perThread.isActive()) { + ensureOpen(); + assert false: "perThread is not active but we are still open"; + } + + final DocumentsWriterPerThread dwpt = perThread.perThread; + try { + int docCount = 0; + for(Document doc : docs) { + dwpt.updateDocument(doc, analyzer, delTerm); + docCount++; + } + numDocsInRAM.addAndGet(docCount); + } finally { + if (dwpt.checkAndResetHasAborted()) { + flushControl.doOnAbort(perThread); + } + } + final boolean isUpdate = delTerm != null; + flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); + } finally { + perThread.unlock(); + } + + return postUpdate(flushingDWPT, maybeMerge); + } + + boolean updateDocument(final Document doc, final Analyzer analyzer, + final Term delTerm) throws CorruptIndexException, IOException { + + boolean maybeMerge = preUpdate(); + + final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this); final DocumentsWriterPerThread flushingDWPT; try { @@ -324,20 +377,13 @@ flushControl.doOnAbort(perThread); } } + final boolean isUpdate = delTerm != null; flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); } finally { perThread.unlock(); } - - if (flushingDWPT != null) { - maybeMerge |= doFlush(flushingDWPT); - } else { - final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush(); - if (nextPendingFlush != null) { - maybeMerge |= doFlush(nextPendingFlush); - } - } - return maybeMerge; + + return postUpdate(flushingDWPT, maybeMerge); } private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException { Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java --- lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Tue May 17 05:48:17 2011 -0400 +++ lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Tue May 17 06:50:54 2011 -0400 @@ -162,8 +162,6 @@ stallControl.updateStalled(this); assert assertMemory(); } - - } synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) { @@ -217,7 +215,7 @@ assert assertMemory(); // Take it out of the loop this DWPT is stale perThreadPool.replaceForFlush(state, closed); - }finally { + } finally { stallControl.updateStalled(this); } } Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java --- lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Tue May 17 05:48:17 2011 -0400 +++ lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Tue May 17 06:50:54 2011 -0400 @@ -232,6 +232,10 @@ } finally { if (!success) { if (!aborting) { + // nocommit: must fix this to delete any + // successfully added docIDs in the addDocuments + // case: + // mark document as deleted deleteDocID(docState.docID); numDocsInRAM++; Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java --- lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java Tue May 17 05:48:17 2011 -0400 +++ lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java Tue May 17 06:50:54 2011 -0400 @@ -19,7 +19,6 @@ import java.util.Iterator; import java.util.concurrent.locks.ReentrantLock; -import org.apache.lucene.document.Document; import org.apache.lucene.index.FieldInfos.FieldNumberBiMap; import org.apache.lucene.index.SegmentCodecs.SegmentCodecsBuilder; import org.apache.lucene.index.codecs.CodecProvider; @@ -212,7 +211,7 @@ // don't recycle DWPT by default } - public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc); + public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter); /** * Returns an iterator providing access to all {@link ThreadState} Index: lucene/src/java/org/apache/lucene/index/IndexWriter.java --- lucene/src/java/org/apache/lucene/index/IndexWriter.java Tue May 17 05:48:17 2011 -0400 +++ lucene/src/java/org/apache/lucene/index/IndexWriter.java Tue May 17 06:50:54 2011 -0400 @@ -1231,6 +1231,125 @@ } /** + * Adds multiple documents. The documents are atomically + * added to the index (all or none, as visible from {@link + * #commit} or {@link IndexReader#open(IndexWriter)), and + * will be added to a single segment with sequentially + * assigned docIDs in the order the iterator provides + * them. + * + *

See {@link #addDocument(Document)} for details on + * index and IndexWriter state after an Exception, and + * flushing/merging temporary free space requirements.

+ * + *

NOTE: if this method hits an OutOfMemoryError + * you should immediately close the writer. See above for details.

+ * + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + * + * @lucene.experimental + */ + public void addDocuments(Iterable docs) throws CorruptIndexException, IOException { + addDocuments(docs, analyzer); + } + + /** + * Adds multiple documents. The documents are atomically + * added to the index (all or none, as visible from {@link + * #commit} or {@link IndexReader#open(IndexWriter)), and + * will be added to a single segment with sequentially + * assigned docIDs in the order the iterator provides + * them. + * + *

See {@link #addDocument(Document)} for details on + * index and IndexWriter state after an Exception, and + * flushing/merging temporary free space requirements.

+ * + *

NOTE: if this method hits an OutOfMemoryError + * you should immediately close the writer. See above for details.

+ * + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + * + * @lucene.experimental + */ + public void addDocuments(Iterable docs, Analyzer analyzer) throws CorruptIndexException, IOException { + updateDocuments(null, docs, analyzer); + } + + /** + * First deletes all documents matching the provided + * delTerm, then adds multiple documents. The delete and + * adds are atomic (all or none, as visible from {@link + * #commit} or {@link IndexReader#open(IndexWriter)), and + * will be added to a single segment with sequentially + * assigned docIDs in the order the iterator provides + * them. + * + *

See {@link #addDocument(Document)} for details on + * index and IndexWriter state after an Exception, and + * flushing/merging temporary free space requirements.

+ * + *

NOTE: if this method hits an OutOfMemoryError + * you should immediately close the writer. See above for details.

+ * + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + * + * @lucene.experimental + */ + public void updateDocuments(Term delTerm, Iterable docs) throws CorruptIndexException, IOException { + updateDocuments(delTerm, docs, analyzer); + } + + /** + * First deletes all documents matching the provided + * delTerm, then adds multiple documents. The delete and + * adds are atomic (all or none, as visible from {@link + * #commit} or {@link IndexReader#open(IndexWriter)), and + * will be added to a single segment with sequentially + * assigned docIDs in the order the iterator provides + * them. + * + *

See {@link #addDocument(Document)} for details on + * index and IndexWriter state after an Exception, and + * flushing/merging temporary free space requirements.

+ * + *

NOTE: if this method hits an OutOfMemoryError + * you should immediately close the writer. See above for details.

+ * + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + * + * @lucene.experimental + */ + public void updateDocuments(Term delTerm, Iterable docs, Analyzer analyzer) throws CorruptIndexException, IOException { + ensureOpen(); + try { + boolean success = false; + boolean anySegmentFlushed = false; + try { + anySegmentFlushed = docWriter.updateDocuments(docs, analyzer, delTerm); + success = true; + } finally { + if (!success && infoStream != null) { + message("hit exception updating document"); + } + } + if (anySegmentFlushed) { + maybeMerge(); + } + } catch (OutOfMemoryError oom) { + handleOOM(oom, "updateDocuments"); + } + } + + /** * Deletes the document(s) containing term. * *

NOTE: if this method hits an OutOfMemoryError Index: lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java --- lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java Tue May 17 05:48:17 2011 -0400 +++ lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java Tue May 17 06:50:54 2011 -0400 @@ -18,7 +18,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.apache.lucene.document.Document; import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; //javadoc /** @@ -48,7 +47,7 @@ } @Override - public ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc) { + public ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) { ThreadState threadState = threadBindings.get(requestingThread); if (threadState != null) { if (threadState.tryLock()) {