diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java index 1dc05e4..1eec53c 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -202,7 +202,6 @@ final class DocumentsWriter { * discarding any docs added since last flush. */ synchronized void abort() throws IOException { boolean success = false; - synchronized (this) { deleteQueue.clear(); } @@ -233,6 +232,7 @@ final class DocumentsWriter { perThread.unlock(); } } + flushControl.waitForFlush(); success = true; } finally { if (infoStream.isEnabled("DW")) { diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index ff38d7d..ed72479 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -327,8 +327,10 @@ final class DocumentsWriterFlushControl implements MemoryController { synchronized void setClosed() { // set by DW to signal that we should not release new DWPT after close - this.closed = true; - perThreadPool.deactivateUnreleasedStates(); + if (!closed) { + this.closed = true; + perThreadPool.deactivateUnreleasedStates(); + } } /** diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java index 24d00f5..ac0eedd 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java @@ -463,10 +463,6 @@ class DocumentsWriterPerThread { pendingDeletes.docIDs.clear(); } - if (infoStream.isEnabled("DWPT")) { - infoStream.message("DWPT", "flush postings as segment " + flushState.segmentInfo.name + " numDocs=" + numDocsInRAM); - } - if (aborting) { if (infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", "flush: skip because aborting is set"); @@ -474,6 +470,10 @@ class DocumentsWriterPerThread { return null; } + if (infoStream.isEnabled("DWPT")) { + infoStream.message("DWPT", "flush postings as segment " + flushState.segmentInfo.name + " numDocs=" + numDocsInRAM); + } + boolean success = false; try { diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index eb46001..5e392b3 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -841,15 +841,19 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { */ public void close(boolean waitForMerges) throws CorruptIndexException, IOException { - // Ensure that only one thread actually gets to do the closing: - if (shouldClose()) { - // If any methods have hit OutOfMemoryError, then abort - // on close, in case the internal state of IndexWriter - // or DocumentsWriter is corrupt - if (hitOOM) - rollbackInternal(); - else - closeInternal(waitForMerges); + // Ensure that only one thread actually gets to do the + // closing, and make sure no commit is also in progress: + synchronized(commitLock) { + if (shouldClose()) { + // If any methods have hit OutOfMemoryError, then abort + // on close, in case the internal state of IndexWriter + // or DocumentsWriter is corrupt + if (hitOOM) { + rollbackInternal(); + } else { + closeInternal(waitForMerges, !hitOOM); + } + } } } @@ -868,12 +872,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { // successfully) or another (fails to close) doWait(); } - } else + } else { return false; + } } } - private void closeInternal(boolean waitForMerges) throws CorruptIndexException, IOException { + private void closeInternal(boolean waitForMerges, boolean doFlush) throws CorruptIndexException, IOException { try { @@ -889,8 +894,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { // Only allow a new merge to be triggered if we are // going to wait for merges: - if (!hitOOM) { + if (doFlush) { flush(waitForMerges, true); + } else { + docWriter.abort(); // already closed } if (waitForMerges) @@ -910,7 +917,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { infoStream.message("IW", "now call final commit()"); } - if (!hitOOM) { + if (doFlush) { commitInternal(null); } @@ -1774,9 +1781,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { public void rollback() throws IOException { ensureOpen(); - // Ensure that only one thread actually gets to do the closing: - if (shouldClose()) - rollbackInternal(); + // Ensure that only one thread actually gets to do the + // closing, and make sure no commit is also in progress: + synchronized(commitLock) { + if (shouldClose()) { + rollbackInternal(); + } + } } private void rollbackInternal() throws IOException { @@ -1786,6 +1797,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "rollback"); } + try { synchronized(this) { @@ -1826,7 +1838,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { if (infoStream.isEnabled("IW") ) { infoStream.message("IW", "rollback: infos=" + segString(segmentInfos)); } - + docWriter.close(); // mark it as closed first to prevent subsequent indexing actions/flushes docWriter.abort(); assert testPoint("rollback before checkpoint"); @@ -1854,7 +1866,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { } } - closeInternal(false); + closeInternal(false, false); } /** @@ -2482,99 +2494,102 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { public final void prepareCommit(Map commitUserData) throws CorruptIndexException, IOException { ensureOpen(false); - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "prepareCommit: flush"); - infoStream.message("IW", " index before flush " + segString()); - } + synchronized(commitLock) { + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "prepareCommit: flush"); + infoStream.message("IW", " index before flush " + segString()); + } - if (hitOOM) { - throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot commit"); - } + if (hitOOM) { + throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot commit"); + } - if (pendingCommit != null) { - throw new IllegalStateException("prepareCommit was already called with no corresponding call to commit"); - } + if (pendingCommit != null) { + throw new IllegalStateException("prepareCommit was already called with no corresponding call to commit"); + } - doBeforeFlush(); - assert testPoint("startDoFlush"); - SegmentInfos toCommit = null; - boolean anySegmentsFlushed = false; + doBeforeFlush(); + assert testPoint("startDoFlush"); + SegmentInfos toCommit = null; + boolean anySegmentsFlushed = false; - // This is copied from doFlush, except it's modified to - // clone & incRef the flushed SegmentInfos inside the - // sync block: + // This is copied from doFlush, except it's modified to + // clone & incRef the flushed SegmentInfos inside the + // sync block: - try { + try { - synchronized (fullFlushLock) { - boolean flushSuccess = false; - boolean success = false; - try { - anySegmentsFlushed = docWriter.flushAllThreads(); - if (!anySegmentsFlushed) { - // prevent double increment since docWriter#doFlush increments the flushcount - // if we flushed anything. - flushCount.incrementAndGet(); - } - flushSuccess = true; + synchronized (fullFlushLock) { + boolean flushSuccess = false; + boolean success = false; + try { + anySegmentsFlushed = docWriter.flushAllThreads(); + if (!anySegmentsFlushed) { + // prevent double increment since docWriter#doFlush increments the flushcount + // if we flushed anything. + flushCount.incrementAndGet(); + } + flushSuccess = true; - synchronized(this) { - maybeApplyDeletes(true); - - readerPool.commit(segmentInfos); - - // Must clone the segmentInfos while we still - // hold fullFlushLock and while sync'd so that - // no partial changes (eg a delete w/o - // corresponding add from an updateDocument) can - // sneak into the commit point: - toCommit = segmentInfos.clone(); - - pendingCommitChangeCount = changeCount; - - // This protects the segmentInfos we are now going - // to commit. This is important in case, eg, while - // we are trying to sync all referenced files, a - // merge completes which would otherwise have - // removed the files we are now syncing. - filesToCommit = toCommit.files(directory, false); - deleter.incRef(filesToCommit); - } - success = true; - } finally { - if (!success) { - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "hit exception during prepareCommit"); + synchronized(this) { + maybeApplyDeletes(true); + + readerPool.commit(segmentInfos); + + // Must clone the segmentInfos while we still + // hold fullFlushLock and while sync'd so that + // no partial changes (eg a delete w/o + // corresponding add from an updateDocument) can + // sneak into the commit point: + toCommit = segmentInfos.clone(); + + pendingCommitChangeCount = changeCount; + + // This protects the segmentInfos we are now going + // to commit. This is important in case, eg, while + // we are trying to sync all referenced files, a + // merge completes which would otherwise have + // removed the files we are now syncing. + filesToCommit = toCommit.files(directory, false); + deleter.incRef(filesToCommit); } + success = true; + } finally { + if (!success) { + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "hit exception during prepareCommit"); + } + } + // Done: finish the full flush! + docWriter.finishFullFlush(flushSuccess); + doAfterFlush(); } - // Done: finish the full flush! - docWriter.finishFullFlush(flushSuccess); - doAfterFlush(); } + } catch (OutOfMemoryError oom) { + handleOOM(oom, "prepareCommit"); } - } catch (OutOfMemoryError oom) { - handleOOM(oom, "prepareCommit"); - } - boolean success = false; - try { - if (anySegmentsFlushed) { - maybeMerge(); - } - success = true; - } finally { - if (!success) { - synchronized (this) { - deleter.decRef(filesToCommit); - filesToCommit = null; + boolean success = false; + try { + if (anySegmentsFlushed) { + maybeMerge(); + } + success = true; + } finally { + if (!success) { + synchronized (this) { + deleter.decRef(filesToCommit); + filesToCommit = null; + } } } - } - startCommit(toCommit, commitUserData); + startCommit(toCommit, commitUserData); + } } - // Used only by commit, below; lock order is commitLock -> IW + // Used only by commit and prepareCommit, below; lock + // order is commitLock -> IW private final Object commitLock = new Object(); /** @@ -2634,6 +2649,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { } synchronized(commitLock) { + ensureOpen(false); + if (infoStream.isEnabled("IW")) { infoStream.message("IW", "commit: enter lock"); } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java index 750f721..325cd6d 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java @@ -19,6 +19,10 @@ package org.apache.lucene.index; import java.io.IOException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.Document; @@ -31,10 +35,13 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.LineFileDocs; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util._TestUtil; +import com.carrotsearch.randomizedtesting.annotations.Repeat; + /** * MultiThreaded IndexWriter tests */ @@ -456,41 +463,132 @@ public class TestIndexWriterWithThreads extends LuceneTestCase { dir.close(); } - static class DelayedIndexAndCloseRunnable extends Thread { - private final Directory dir; - boolean failed = false; - Throwable failure = null; - private final CountDownLatch startIndexing = new CountDownLatch(1); - private CountDownLatch iwConstructed; - - public DelayedIndexAndCloseRunnable(Directory dir, - CountDownLatch iwConstructed) { - this.dir = dir; - this.iwConstructed = iwConstructed; - } - - public void startIndexing() { - this.startIndexing.countDown(); - } - - @Override - public void run() { - try { - Document doc = new Document(); - Field field = newTextField("field", "testData", Field.Store.YES); - doc.add(field); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig( - TEST_VERSION_CURRENT, new MockAnalyzer(random()))); - iwConstructed.countDown(); - startIndexing.await(); - writer.addDocument(doc); - writer.close(); - } catch (Throwable e) { - failed = true; - failure = e; - failure.printStackTrace(System.out); - return; - } - } - } + static class DelayedIndexAndCloseRunnable extends Thread { + private final Directory dir; + boolean failed = false; + Throwable failure = null; + private final CountDownLatch startIndexing = new CountDownLatch(1); + private CountDownLatch iwConstructed; + + public DelayedIndexAndCloseRunnable(Directory dir, + CountDownLatch iwConstructed) { + this.dir = dir; + this.iwConstructed = iwConstructed; + } + + public void startIndexing() { + this.startIndexing.countDown(); + } + + @Override + public void run() { + try { + Document doc = new Document(); + Field field = newTextField("field", "testData", Field.Store.YES); + doc.add(field); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig( + TEST_VERSION_CURRENT, new MockAnalyzer(random()))); + iwConstructed.countDown(); + startIndexing.await(); + writer.addDocument(doc); + writer.close(); + } catch (Throwable e) { + failed = true; + failure = e; + failure.printStackTrace(System.out); + return; + } + } + } + + // LUCENE-4147 + public void testRollbackAndCommitWithThreads() throws Exception { + final MockDirectoryWrapper d = newFSDirectory(_TestUtil.getTempDir("RollbackAndCommitWithThreads")); + d.setPreventDoubleWrite(false); + + final int threadCount = _TestUtil.nextInt(random(), 2, 6); + + final AtomicReference writerRef = new AtomicReference(); + writerRef.set(new IndexWriter(d, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())))); + final LineFileDocs docs = new LineFileDocs(random()); + final Thread[] threads = new Thread[threadCount]; + final int iters = atLeast(1000); + final AtomicBoolean failed = new AtomicBoolean(); + final Lock rollbackLock = new ReentrantLock(); + final Lock commitLock = new ReentrantLock(); + for(int threadID=0;threadID