Index: lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java (revision 1351292) +++ lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java (working copy) @@ -19,6 +19,10 @@ import java.io.IOException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.Document; @@ -31,6 +35,7 @@ import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.LineFileDocs; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util._TestUtil; @@ -456,41 +461,124 @@ dir.close(); } - static class DelayedIndexAndCloseRunnable extends Thread { - private final Directory dir; - boolean failed = false; - Throwable failure = null; - private final CountDownLatch startIndexing = new CountDownLatch(1); - private CountDownLatch iwConstructed; + static class DelayedIndexAndCloseRunnable extends Thread { + private final Directory dir; + boolean failed = false; + Throwable failure = null; + private final CountDownLatch startIndexing = new CountDownLatch(1); + private CountDownLatch iwConstructed; - public DelayedIndexAndCloseRunnable(Directory dir, - CountDownLatch iwConstructed) { - this.dir = dir; - this.iwConstructed = iwConstructed; - } + public DelayedIndexAndCloseRunnable(Directory dir, + CountDownLatch iwConstructed) { + this.dir = dir; + this.iwConstructed = iwConstructed; + } - public void startIndexing() { - this.startIndexing.countDown(); - } + public void startIndexing() { + this.startIndexing.countDown(); + } - @Override - public void run() { - try { - Document doc = new Document(); - Field field = newTextField("field", "testData", Field.Store.YES); - doc.add(field); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig( - TEST_VERSION_CURRENT, new MockAnalyzer(random()))); - iwConstructed.countDown(); - startIndexing.await(); - writer.addDocument(doc); - writer.close(); - } catch (Throwable e) { - failed = true; - failure = e; - failure.printStackTrace(System.out); - return; - } - } - } + @Override + public void run() { + try { + Document doc = new Document(); + Field field = newTextField("field", "testData", Field.Store.YES); + doc.add(field); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig( + TEST_VERSION_CURRENT, new MockAnalyzer(random()))); + iwConstructed.countDown(); + startIndexing.await(); + writer.addDocument(doc); + writer.close(); + } catch (Throwable e) { + failed = true; + failure = e; + failure.printStackTrace(System.out); + return; + } + } + } + + // LUCENE-4147 + public void testRollbackAndCommitWithThreads() throws Exception { + final MockDirectoryWrapper d = newFSDirectory(_TestUtil.getTempDir("RollbackAndCommitWithThreads")); + d.setPreventDoubleWrite(false); + + final int threadCount = _TestUtil.nextInt(random(), 2, 6); + + final AtomicReference writerRef = new AtomicReference(); + writerRef.set(new IndexWriter(d, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())))); + final LineFileDocs docs = new LineFileDocs(random()); + final Thread[] threads = new Thread[threadCount]; + final int iters = atLeast(1000); + final AtomicBoolean failed = new AtomicBoolean(); + final Lock rollbackLock = new ReentrantLock(); + final Lock commitLock = new ReentrantLock(); + for(int threadID=0;threadID commitUserData) throws CorruptIndexException, IOException { ensureOpen(false); - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "prepareCommit: flush"); - infoStream.message("IW", " index before flush " + segString()); - } + synchronized(commitLock) { + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "prepareCommit: flush"); + infoStream.message("IW", " index before flush " + segString()); + } - if (hitOOM) { - throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot commit"); - } + if (hitOOM) { + throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot commit"); + } - if (pendingCommit != null) { - throw new IllegalStateException("prepareCommit was already called with no corresponding call to commit"); - } + if (pendingCommit != null) { + throw new IllegalStateException("prepareCommit was already called with no corresponding call to commit"); + } - doBeforeFlush(); - assert testPoint("startDoFlush"); - SegmentInfos toCommit = null; - boolean anySegmentsFlushed = false; + doBeforeFlush(); + assert testPoint("startDoFlush"); + SegmentInfos toCommit = null; + boolean anySegmentsFlushed = false; - // This is copied from doFlush, except it's modified to - // clone & incRef the flushed SegmentInfos inside the - // sync block: + // This is copied from doFlush, except it's modified to + // clone & incRef the flushed SegmentInfos inside the + // sync block: - try { + try { - synchronized (fullFlushLock) { - boolean flushSuccess = false; - boolean success = false; - try { - anySegmentsFlushed = docWriter.flushAllThreads(); - if (!anySegmentsFlushed) { - // prevent double increment since docWriter#doFlush increments the flushcount - // if we flushed anything. - flushCount.incrementAndGet(); - } - flushSuccess = true; + synchronized (fullFlushLock) { + boolean flushSuccess = false; + boolean success = false; + try { + anySegmentsFlushed = docWriter.flushAllThreads(); + if (!anySegmentsFlushed) { + // prevent double increment since docWriter#doFlush increments the flushcount + // if we flushed anything. + flushCount.incrementAndGet(); + } + flushSuccess = true; - synchronized(this) { - maybeApplyDeletes(true); + synchronized(this) { + maybeApplyDeletes(true); - readerPool.commit(segmentInfos); + readerPool.commit(segmentInfos); - // Must clone the segmentInfos while we still - // hold fullFlushLock and while sync'd so that - // no partial changes (eg a delete w/o - // corresponding add from an updateDocument) can - // sneak into the commit point: - toCommit = segmentInfos.clone(); + // Must clone the segmentInfos while we still + // hold fullFlushLock and while sync'd so that + // no partial changes (eg a delete w/o + // corresponding add from an updateDocument) can + // sneak into the commit point: + toCommit = segmentInfos.clone(); - pendingCommitChangeCount = changeCount; + pendingCommitChangeCount = changeCount; - // This protects the segmentInfos we are now going - // to commit. This is important in case, eg, while - // we are trying to sync all referenced files, a - // merge completes which would otherwise have - // removed the files we are now syncing. - filesToCommit = toCommit.files(directory, false); - deleter.incRef(filesToCommit); - } - success = true; - } finally { - if (!success) { - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "hit exception during prepareCommit"); + // This protects the segmentInfos we are now going + // to commit. This is important in case, eg, while + // we are trying to sync all referenced files, a + // merge completes which would otherwise have + // removed the files we are now syncing. + filesToCommit = toCommit.files(directory, false); + deleter.incRef(filesToCommit); } + success = true; + } finally { + if (!success) { + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "hit exception during prepareCommit"); + } + } + // Done: finish the full flush! + docWriter.finishFullFlush(flushSuccess); + doAfterFlush(); } - // Done: finish the full flush! - docWriter.finishFullFlush(flushSuccess); - doAfterFlush(); } + } catch (OutOfMemoryError oom) { + handleOOM(oom, "prepareCommit"); } - } catch (OutOfMemoryError oom) { - handleOOM(oom, "prepareCommit"); - } - boolean success = false; - try { - if (anySegmentsFlushed) { - maybeMerge(); - } - success = true; - } finally { - if (!success) { - synchronized (this) { - deleter.decRef(filesToCommit); - filesToCommit = null; + boolean success = false; + try { + if (anySegmentsFlushed) { + maybeMerge(); } + success = true; + } finally { + if (!success) { + synchronized (this) { + deleter.decRef(filesToCommit); + filesToCommit = null; + } + } } + + startCommit(toCommit, commitUserData); } - - startCommit(toCommit, commitUserData); } - // Used only by commit, below; lock order is commitLock -> IW + // Used only by commit and prepareCommit, below; lock + // order is commitLock -> IW private final Object commitLock = new Object(); /** @@ -2634,6 +2649,8 @@ } synchronized(commitLock) { + ensureOpen(false); + if (infoStream.isEnabled("IW")) { infoStream.message("IW", "commit: enter lock"); } Index: lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (revision 1351292) +++ lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (working copy) @@ -463,10 +463,6 @@ pendingDeletes.docIDs.clear(); } - if (infoStream.isEnabled("DWPT")) { - infoStream.message("DWPT", "flush postings as segment " + flushState.segmentInfo.name + " numDocs=" + numDocsInRAM); - } - if (aborting) { if (infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", "flush: skip because aborting is set"); @@ -474,6 +470,10 @@ return null; } + if (infoStream.isEnabled("DWPT")) { + infoStream.message("DWPT", "flush postings as segment " + flushState.segmentInfo.name + " numDocs=" + numDocsInRAM); + } + boolean success = false; try {