Index: lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java (revision 1483748) +++ lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java (working copy) @@ -29,6 +29,7 @@ import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain; import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; import org.apache.lucene.index.FieldInfos.FieldNumbers; +import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.store.AlreadyClosedException; @@ -240,6 +241,63 @@ } } } + + synchronized void lockAndAbortAll() { + assert indexWriter.holdsFullFlushLock(); + if (infoStream.isEnabled("DW")) { + infoStream.message("DW", "lockAndAbortAll"); + } + boolean success = false; + try { + deleteQueue.clear(); + final int limit = perThreadPool.getMaxThreadStates(); + for (int i = 0; i < limit; i++) { + final ThreadState perThread = perThreadPool.getThreadState(i); + perThread.lock(); + if (perThread.isActive()) { // we might be closed or + try { + perThread.dwpt.abort(); + } finally { + perThread.dwpt.checkAndResetHasAborted(); + flushControl.doOnAbort(perThread); + } + } + } + deleteQueue.clear(); + flushControl.abortPendingFlushes(); + flushControl.waitForFlush(); + success = true; + } finally { + if (infoStream.isEnabled("DW")) { + infoStream.message("DW", "finished lockAndAbortAll success=" + success); + } + if (!success) { + // if something happens here we unlock all states again + unlockAllAfterAbortAll(); + } + } + } + + final synchronized void unlockAllAfterAbortAll() { + assert indexWriter.holdsFullFlushLock(); + if (infoStream.isEnabled("DW")) { + infoStream.message("DW", "unlockAll"); + } + final int limit = perThreadPool.getMaxThreadStates(); + for (int i = 0; i < limit; i++) { + try { + final ThreadState perThread = perThreadPool.getThreadState(i); + if (perThread.isHeldByCurrentThread()) { + perThread.unlock(); + } + } catch(Throwable e) { + if (infoStream.isEnabled("DW")) { + infoStream.message("DW", "unlockAll: could not unlock state: " + i + " msg:" + e.getMessage()); + } + // ignore & keep on unlocking + } + } + } boolean anyChanges() { if (infoStream.isEnabled("DW")) { Index: lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (revision 1483748) +++ lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (working copy) @@ -240,6 +240,7 @@ } public synchronized void waitForFlush() { + assert !Thread.holdsLock(this.documentsWriter.indexWriter) : "IW lock should never be hold when waiting on flush"; while (flushingWriters.size() != 0) { try { this.wait(); @@ -606,9 +607,10 @@ for (DocumentsWriterPerThread dwpt : flushQueue) { try { dwpt.abort(); - doAfterFlush(dwpt); } catch (Throwable ex) { // ignore - keep on aborting the flush queue + } finally { + doAfterFlush(dwpt); } } for (BlockedFlush blockedFlush : blockedFlushes) { @@ -616,9 +618,10 @@ flushingWriters .put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes)); blockedFlush.dwpt.abort(); - doAfterFlush(blockedFlush.dwpt); } catch (Throwable ex) { // ignore - keep on aborting the blocked queue + } finally { + doAfterFlush(blockedFlush.dwpt); } } } finally { Index: lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (revision 1483748) +++ lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (working copy) @@ -274,7 +274,6 @@ * given ord. */ ThreadState getThreadState(int ord) { - assert ord < numThreadStatesActive; return threadStates[ord]; } Index: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (revision 1483748) +++ lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -957,7 +957,7 @@ if (doFlush) { flush(waitForMerges, true); } else { - docWriter.abort(); // already closed + docWriter.abort(); // already closed -- never sync on IW } } finally { @@ -2006,7 +2006,7 @@ bufferedDeletesStream.clear(); docWriter.close(); // mark it as closed first to prevent subsequent indexing actions/flushes - docWriter.abort(); + docWriter.abort(); // don't sync on IW here synchronized(this) { if (pendingCommit != null) { @@ -2066,7 +2066,13 @@ * visible until a {@link #commit()} has been called. This method * can be rolled back using {@link #rollback()}.
* - *NOTE: this method is much faster than using deleteDocuments( new MatchAllDocsQuery() ).
+ *NOTE: this method is much faster than using deleteDocuments( new MatchAllDocsQuery() ). + * Yet, this method also has different semantics compared to {@link #deleteDocuments(Query)} + * / {@link #deleteDocuments(Query...)} since internal data-structures are cleared as well + * as all segment information is forcefully dropped anti-viral semantics like omitting norms + * are reset or doc value types are cleared. Essentially a call to {@link #deleteAll()} is equivalent + * to creating a new {@link IndexWriter} with {@link OpenMode#CREATE} which a delete query only marks + * documents as deleted.
* *NOTE: this method will forcefully abort all merges * in progress. If other threads are running {@link @@ -2074,40 +2080,58 @@ * {@link #forceMergeDeletes} methods, they may receive * {@link MergePolicy.MergeAbortedException}s. */ - public synchronized void deleteAll() throws IOException { + public void deleteAll() throws IOException { ensureOpen(); + // Remove any buffered docs boolean success = false; - try { - - // Abort any running merges - finishMerges(false); - - // Remove any buffered docs - docWriter.abort(); - - // Remove all segments - segmentInfos.clear(); - - // Ask deleter to locate unreferenced files & remove them: - deleter.checkpoint(segmentInfos, false); - deleter.refresh(); - - globalFieldNumberMap.clear(); - - // Don't bother saving any changes in our segmentInfos - readerPool.dropAll(false); - - // Mark that the index has changed - ++changeCount; - segmentInfos.changed(); - success = true; - } catch (OutOfMemoryError oom) { - handleOOM(oom, "deleteAll"); - } finally { - if (!success) { - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "hit exception during deleteAll"); + /* hold the full flush lock to prevent concurrency commits / NRT reopens to + * get in our way and do unnecessary work. -- if we don't lock this here we might + * get in trouble if */ + synchronized (fullFlushLock) { + /* + * We first abort and trash everything we have in-memory + * and keep the thread-states locked, the lockAndAbortAll operation + * also guarantees "point in time semantics" ie. the checkpoint that we need in terms + * of logical happens-before relationship in the DW. So we do + * abort all in memory structures + * We also drop global field numbering before during abort to make + * sure it's just like a fresh index. + */ + try { + docWriter.lockAndAbortAll(); + synchronized (this) { + try { + // Abort any running merges + finishMerges(false); + // Remove all segments + segmentInfos.clear(); + // Ask deleter to locate unreferenced files & remove them: + deleter.checkpoint(segmentInfos, false); + /* don't refresh the deleter here since there might + * be concurrent indexing requests coming in opening + * files on the directory after we called DW#abort() + * if we do so these indexing requests might hit FNF exceptions. + * We will remove the files incrementally as we go... + */ + // Don't bother saving any changes in our segmentInfos + readerPool.dropAll(false); + // Mark that the index has changed + ++changeCount; + segmentInfos.changed(); + globalFieldNumberMap.clear(); + success = true; + } catch (OutOfMemoryError oom) { + handleOOM(oom, "deleteAll"); + } finally { + if (!success) { + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "hit exception during deleteAll"); + } + } + } } + } finally { + docWriter.unlockAllAfterAbortAll(); } } } @@ -2867,6 +2891,11 @@ // Ensures only one flush() is actually flushing segments // at a time: private final Object fullFlushLock = new Object(); + + // for assert + boolean holdsFullFlushLock() { + return Thread.holdsLock(fullFlushLock); + } /** * Flush all in-memory buffered updates (adds and deletes) Index: lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java (revision 1483748) +++ lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java (working copy) @@ -25,6 +25,8 @@ import java.util.Collections; import java.util.List; import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -302,7 +304,70 @@ modifier.close(); dir.close(); } + + + public void testDeleteAllNoDeadLock() throws IOException, InterruptedException { + Directory dir = newDirectory(); + final RandomIndexWriter modifier = new RandomIndexWriter(random(), dir); + int numThreads = atLeast(2); + Thread[] threads = new Thread[numThreads]; + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch doneLatch = new CountDownLatch(numThreads); + for (int i = 0; i < numThreads; i++) { + final int offset = i; + threads[i] = new Thread() { + @Override + public void run() { + int id = offset * 1000; + int value = 100; + try { + latch.await(); + for (int i = 0; i < 1000; i++) { + Document doc = new Document(); + doc.add(newTextField("content", "aaa", Field.Store.NO)); + doc.add(newStringField("id", String.valueOf(id++), Field.Store.YES)); + doc.add(newStringField("value", String.valueOf(value), Field.Store.NO)); + doc.add(new NumericDocValuesField("dv", value)); + modifier.addDocument(doc); + if (VERBOSE) { + System.out.println("\tThread["+offset+"]: add doc: " + id); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + doneLatch.countDown(); + if (VERBOSE) { + System.out.println("\tThread["+offset+"]: done indexing" ); + } + } + } + }; + threads[i].start(); + } + latch.countDown(); + while(!doneLatch.await(1, TimeUnit.MILLISECONDS)) { + modifier.deleteAll(); + if (VERBOSE) { + System.out.println("del all"); + } + } + + modifier.deleteAll(); + for (Thread thread : threads) { + thread.join(); + } + + modifier.close(); + DirectoryReader reader = DirectoryReader.open(dir); + assertEquals(reader.maxDoc(), 0); + assertEquals(reader.numDocs(), 0); + assertEquals(reader.numDeletedDocs(), 0); + reader.close(); + dir.close(); + } + // test rollback of deleteAll() public void testDeleteAllRollback() throws IOException { Directory dir = newDirectory();