Index: solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java =================================================================== --- solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java (revision 1152848) +++ solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java (working copy) @@ -16,26 +16,12 @@ */ package org.apache.solr.search; -import org.apache.lucene.analysis.core.WhitespaceAnalyzer; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.Term; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.TermQuery; -import org.apache.lucene.search.TopDocs; -import org.apache.lucene.store.RAMDirectory; -import org.apache.lucene.util.Version; import org.apache.noggit.ObjectBuilder; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.common.SolrException; import org.apache.solr.request.SolrQueryRequest; import org.junit.BeforeClass; import org.junit.Test; -import org.junit.Ignore; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -123,7 +109,7 @@ // query variables final int percentRealtimeQuery = 0; // realtime get is not implemented yet - final AtomicLong operations = new AtomicLong(0); // number of query operations to perform in total // TODO: once lucene level passes, we can move on to the solr level + final AtomicLong operations = new AtomicLong(1000); // number of query operations to perform in total // TODO: once lucene level passes, we can move on to the solr level int nReadThreads = 10; initModel(ndocs); @@ -272,257 +258,5 @@ for (Thread thread : threads) { thread.join(); } - } - - - - - IndexReader reader; - - @Ignore - @Test - public void testStressLuceneNRT() throws Exception { - // update variables - final int commitPercent = 10; - final int softCommitPercent = 50; // what percent of the commits are soft - final int deletePercent = 8; - final int deleteByQueryPercent = 4; - final int ndocs = 100; - int nWriteThreads = 10; - final int maxConcurrentCommits = 2; // number of committers at a time... needed if we want to avoid commit errors due to exceeding the max - final boolean tombstones = false; - - // query variables - final AtomicLong operations = new AtomicLong(0); // number of query operations to perform in total // TODO: temporarily high due to lack of stability - int nReadThreads = 10; - - initModel(ndocs); - - final AtomicInteger numCommitting = new AtomicInteger(); - - List threads = new ArrayList(); - - RAMDirectory dir = new RAMDirectory(); - final IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Version.LUCENE_40, new WhitespaceAnalyzer(Version.LUCENE_40))); - writer.commit(); - reader = IndexReader.open(dir); - - for (int i=0; i 0) { - int oper = rand.nextInt(100); - - if (oper < commitPercent) { - if (numCommitting.incrementAndGet() <= maxConcurrentCommits) { - Map newCommittedModel; - long version; - IndexReader oldReader; - - synchronized(TestRealTimeGet.this) { - newCommittedModel = new HashMap(model); // take a snapshot - version = snapshotCount++; - oldReader = reader; - oldReader.incRef(); // increment the reference since we will use this for reopening - } - - IndexReader newReader; - if (rand.nextInt(100) < softCommitPercent) { - // assertU(h.commit("softCommit","true")); - newReader = oldReader.reopen(writer, true); - } else { - // assertU(commit()); - writer.commit(); - newReader = oldReader.reopen(); - } - - synchronized(TestRealTimeGet.this) { - // install the new reader if it's newest (and check the current version since another reader may have already been installed) - if (newReader.getVersion() > reader.getVersion()) { - reader.decRef(); - reader = newReader; - - // install this snapshot only if it's newer than the current one - if (version >= committedModelClock) { - committedModel = newCommittedModel; - committedModelClock = version; - } - - } else if (newReader != oldReader) { - // if the same reader, don't decRef. - newReader.decRef(); - } - - oldReader.decRef(); - } - } - numCommitting.decrementAndGet(); - continue; - } - - - int id = rand.nextInt(ndocs); - Object sync = syncArr[id]; - - // set the lastId before we actually change it sometimes to try and - // uncover more race conditions between writing and reading - boolean before = rand.nextBoolean(); - if (before) { - lastId = id; - } - - // We can't concurrently update the same document and retain our invariants of increasing values - // since we can't guarantee what order the updates will be executed. - synchronized (sync) { - Long val = model.get(id); - long nextVal = Math.abs(val)+1; - - if (oper < commitPercent + deletePercent) { - // assertU("" + id + ""); - - // add tombstone first - if (tombstones) { - Document d = new Document(); - d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS)); - d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO)); - writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d); - } - - writer.deleteDocuments(new Term("id",Integer.toString(id))); - model.put(id, -nextVal); - } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) { - //assertU("id:" + id + ""); - - // add tombstone first - if (tombstones) { - Document d = new Document(); - d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS)); - d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO)); - writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d); - } - - writer.deleteDocuments(new TermQuery(new Term("id", Integer.toString(id)))); - model.put(id, -nextVal); - } else { - // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal))); - Document d = new Document(); - d.add(new Field("id",Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS)); - d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO)); - writer.updateDocument(new Term("id", Integer.toString(id)), d); - - if (tombstones) { - // remove tombstone after new addition (this should be optional?) - writer.deleteDocuments(new Term("id","-"+Integer.toString(id))); - } - - model.put(id, nextVal); - } - } - - if (!before) { - lastId = id; - } - } - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - }; - - threads.add(thread); - } - - - for (int i=0; i= 0) { - // bias toward a recently changed doc - int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs); - - // when indexing, we update the index, then the model - // so when querying, we should first check the model, and then the index - - long val; - - synchronized(TestRealTimeGet.this) { - val = committedModel.get(id); - } - - - IndexReader r; - synchronized(TestRealTimeGet.this) { - r = reader; - r.incRef(); - } - - // sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true"); - IndexSearcher searcher = new IndexSearcher(r); - Query q = new TermQuery(new Term("id",Integer.toString(id))); - TopDocs results = searcher.search(q, 1); - - if (results.totalHits == 0 && tombstones) { - // if we couldn't find the doc, look for it's tombstone - q = new TermQuery(new Term("id","-"+Integer.toString(id))); - results = searcher.search(q, 1); - if (results.totalHits == 0) { - if (val == -1L) { - // expected... no doc was added yet - continue; - } - fail("No documents or tombstones found for id " + id + ", expected at least " + val); - } - } - - if (results.totalHits == 0 && !tombstones) { - // nothing to do - we can't tell anything from a deleted doc without tombstones - } else { - assertEquals(1, results.totalHits); // we should have found the document, or it's tombstone - Document doc = searcher.doc(results.scoreDocs[0].doc); - long foundVal = Long.parseLong(doc.get(field)); - if (foundVal < Math.abs(val)) { - System.out.println("model_val="+val+" foundVal="+foundVal); - } - assertTrue(foundVal >= Math.abs(val)); - } - - r.decRef(); - } - } - catch (Throwable e) { - operations.set(-1L); - SolrException.log(log,e); - fail(e.toString()); - } - } - }; - - threads.add(thread); - } - - - for (Thread thread : threads) { - thread.start(); - } - - for (Thread thread : threads) { - thread.join(); - } - - writer.close(); - reader.close(); - - } - - - } Index: lucene/CHANGES.txt =================================================================== --- lucene/CHANGES.txt (revision 1152848) +++ lucene/CHANGES.txt (working copy) @@ -535,6 +535,10 @@ suppressed exceptions in the original exception, so stack trace will contain them. (Uwe Schindler) +* LUCENE-3348: Fix thread safety hazards in IndexWriter that could + rarely cause deletions to be incorrectly applied. (Yonik Seeley, + Simon Willnauer, Mike McCandless) + New Features * LUCENE-3290: Added FieldInvertState.numUniqueTerms Index: lucene/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java =================================================================== --- lucene/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java (revision 1152848) +++ lucene/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java (working copy) @@ -28,7 +28,6 @@ import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; -import org.apache.lucene.store.IOContext; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.RAMDirectory; import org.apache.lucene.util.LuceneTestCase; @@ -474,6 +473,7 @@ setMergePolicy(newLogMergePolicy(2)) ); _TestUtil.keepFullyDeletedSegments(w); + w.setInfoStream(VERBOSE ? System.out : null); Document doc = new Document(); doc.add(newField("f", "doctor who", Field.Store.YES, Field.Index.ANALYZED)); @@ -489,11 +489,18 @@ dir.failOn(ftdm); try { + if (VERBOSE) { + System.out.println("\nTEST: now commit"); + } w.commit(); fail("fake disk full IOExceptions not hit"); } catch (IOException ioe) { + if (VERBOSE) { + System.out.println("\nTEST: exc"); + ioe.printStackTrace(System.out); + } // expected - assertTrue(ftdm.didFail1); + assertTrue(ftdm.didFail1 || ftdm.didFail2); } _TestUtil.checkIndex(dir); ftdm.clearDoFail(); Index: lucene/src/test/org/apache/lucene/index/TestStressNRT.java =================================================================== --- lucene/src/test/org/apache/lucene/index/TestStressNRT.java (revision 0) +++ lucene/src/test/org/apache/lucene/index/TestStressNRT.java (revision 0) @@ -0,0 +1,396 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.RAMDirectory; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.Version; +import org.apache.lucene.util._TestUtil; + +public class TestStressNRT extends LuceneTestCase { + volatile IndexReader reader; + + final ConcurrentHashMap model = new ConcurrentHashMap(); + Map committedModel = new HashMap(); + long snapshotCount; + long committedModelClock; + volatile int lastId; + final String field = "val_l"; + Object[] syncArr; + + private void initModel(int ndocs) { + snapshotCount = 0; + committedModelClock = 0; + lastId = 0; + + syncArr = new Object[ndocs]; + + for (int i=0; i threads = new ArrayList(); + + Directory dir = newDirectory(); + + // nocommit no MP + final RandomIndexWriter writer = new RandomIndexWriter(random, dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)).setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES)); + //final RandomIndexWriter writer = new RandomIndexWriter(random, dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random))); + writer.setDoRandomOptimizeAssert(false); + writer.w.setInfoStream(VERBOSE ? System.out : null); + writer.commit(); + reader = IndexReader.open(dir); + + for (int i=0; i 0) { + int oper = rand.nextInt(100); + + if (oper < commitPercent) { + if (numCommitting.incrementAndGet() <= maxConcurrentCommits) { + Map newCommittedModel; + long version; + IndexReader oldReader; + + synchronized(TestStressNRT.this) { + newCommittedModel = new HashMap(model); // take a snapshot + version = snapshotCount++; + oldReader = reader; + oldReader.incRef(); // increment the reference since we will use this for reopening + } + + IndexReader newReader; + if (rand.nextInt(100) < softCommitPercent) { + // assertU(h.commit("softCommit","true")); + if (random.nextBoolean()) { + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": call writer.getReader"); + } + newReader = writer.getReader(true); + } else { + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": reopen reader=" + oldReader + " version=" + version); + } + newReader = oldReader.reopen(writer.w, true); + } + } else { + // assertU(commit()); + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": commit+reopen reader=" + oldReader + " version=" + version); + } + writer.commit(); + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": now reopen after commit"); + } + newReader = oldReader.reopen(); + } + + // Code below assumes newReader comes w/ + // extra ref: + if (newReader == oldReader) { + newReader.incRef(); + } + + oldReader.decRef(); + + synchronized(TestStressNRT.this) { + // install the new reader if it's newest (and check the current version since another reader may have already been installed) + //System.out.println(Thread.currentThread().getName() + ": newVersion=" + newReader.getVersion()); + assert newReader.getRefCount() > 0; + assert reader.getRefCount() > 0; + if (newReader.getVersion() > reader.getVersion()) { + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": install new reader=" + newReader); + } + reader.decRef(); + reader = newReader; + + // Silly: forces fieldInfos to be + // loaded so we don't hit IOE on later + // reader.toString + newReader.toString(); + + // install this snapshot only if it's newer than the current one + if (version >= committedModelClock) { + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": install new model version=" + version); + } + committedModel = newCommittedModel; + committedModelClock = version; + } else { + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": skip install new model version=" + version); + } + } + } else { + // if the same reader, don't decRef. + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": skip install new reader=" + newReader); + } + newReader.decRef(); + } + } + } + numCommitting.decrementAndGet(); + } else { + + int id = rand.nextInt(ndocs); + Object sync = syncArr[id]; + + // set the lastId before we actually change it sometimes to try and + // uncover more race conditions between writing and reading + boolean before = random.nextBoolean(); + if (before) { + lastId = id; + } + + // We can't concurrently update the same document and retain our invariants of increasing values + // since we can't guarantee what order the updates will be executed. + synchronized (sync) { + Long val = model.get(id); + long nextVal = Math.abs(val)+1; + + if (oper < commitPercent + deletePercent) { + // assertU("" + id + ""); + + // add tombstone first + if (tombstones) { + Document d = new Document(); + d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS)); + d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO)); + writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d); + } + + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": term delDocs id:" + id + " nextVal=" + nextVal); + } + writer.deleteDocuments(new Term("id",Integer.toString(id))); + model.put(id, -nextVal); + } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) { + //assertU("id:" + id + ""); + + // add tombstone first + if (tombstones) { + Document d = new Document(); + d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS)); + d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO)); + writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d); + } + + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": query delDocs id:" + id + " nextVal=" + nextVal); + } + writer.deleteDocuments(new TermQuery(new Term("id", Integer.toString(id)))); + model.put(id, -nextVal); + } else { + // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal))); + Document d = new Document(); + d.add(newField("id",Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS)); + d.add(newField(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO)); + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": u id:" + id + " val=" + nextVal); + } + writer.updateDocument(new Term("id", Integer.toString(id)), d); + if (tombstones) { + // remove tombstone after new addition (this should be optional?) + writer.deleteDocuments(new Term("id","-"+Integer.toString(id))); + } + model.put(id, nextVal); + } + } + + if (!before) { + lastId = id; + } + } + } + } catch (Throwable e) { + System.out.println(Thread.currentThread().getName() + ": FAILED: unexpected exception"); + e.printStackTrace(System.out); + throw new RuntimeException(e); + } + } + }; + + threads.add(thread); + } + + for (int i=0; i= 0) { + // bias toward a recently changed doc + int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs); + + // when indexing, we update the index, then the model + // so when querying, we should first check the model, and then the index + + long val; + IndexReader r; + synchronized(TestStressNRT.this) { + val = committedModel.get(id); + r = reader; + r.incRef(); + } + + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": s id=" + id + " val=" + val + " r=" + r.getVersion()); + } + + // sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true"); + IndexSearcher searcher = new IndexSearcher(r); + Query q = new TermQuery(new Term("id",Integer.toString(id))); + TopDocs results = searcher.search(q, 10); + + if (results.totalHits == 0 && tombstones) { + // if we couldn't find the doc, look for its tombstone + q = new TermQuery(new Term("id","-"+Integer.toString(id))); + results = searcher.search(q, 1); + if (results.totalHits == 0) { + if (val == -1L) { + // expected... no doc was added yet + r.decRef(); + continue; + } + fail("No documents or tombstones found for id " + id + ", expected at least " + val + " reader=" + r); + } + } + + if (results.totalHits == 0 && !tombstones) { + // nothing to do - we can't tell anything from a deleted doc without tombstones + } else { + // we should have found the document, or its tombstone + if (results.totalHits != 1) { + System.out.println("FAIL: hits id:" + id + " val=" + val); + for(ScoreDoc sd : results.scoreDocs) { + final Document doc = r.document(sd.doc); + System.out.println(" docID=" + sd.doc + " id:" + doc.get("id") + " foundVal=" + doc.get(field)); + } + fail("id=" + id + " reader=" + r + " totalHits=" + results.totalHits); + } + Document doc = searcher.doc(results.scoreDocs[0].doc); + long foundVal = Long.parseLong(doc.get(field)); + if (foundVal < Math.abs(val)) { + fail("foundVal=" + foundVal + " val=" + val + " id=" + id + " reader=" + r); + } + } + + r.decRef(); + } + } catch (Throwable e) { + operations.set(-1L); + System.out.println(Thread.currentThread().getName() + ": FAILED: unexpected exception"); + e.printStackTrace(System.out); + throw new RuntimeException(e); + } + } + }; + + threads.add(thread); + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + writer.close(); + if (VERBOSE) { + System.out.println("TEST: close reader=" + reader); + } + reader.close(); + dir.close(); + } +} Property changes on: lucene/src/test/org/apache/lucene/index/TestStressNRT.java ___________________________________________________________________ Added: svn:eol-style + native Index: lucene/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java =================================================================== --- lucene/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java (revision 1152848) +++ lucene/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java (working copy) @@ -90,7 +90,10 @@ break; } } catch (Throwable t) { - //t.printStackTrace(System.out); + if (VERBOSE) { + System.out.println("TEST: expected exc:"); + t.printStackTrace(System.out); + } if (noErrors) { System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected Throwable:"); t.printStackTrace(System.out); @@ -157,6 +160,10 @@ int NUM_THREADS = 3; int numIterations = TEST_NIGHTLY ? 7 : 3; for(int iter=0;iter 0) { + synchronized(this) { + if (!perThread.flushPending) { + setFlushPending(perThread); + } + final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(perThread); + assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents"; + assert dwpt == flushingDWPT : "flushControl returned different DWPT"; + externalToFlush.add(flushingDWPT); + } + } else { + if (closed) { + perThread.resetWriter(null); // make this state inactive + } else { + dwpt.initialize(); + } + } + } /** * Prunes the blockedQueue by removing all DWPT that are associated with the given flush queue. Index: lucene/src/java/org/apache/lucene/index/DirectoryReader.java =================================================================== --- lucene/src/java/org/apache/lucene/index/DirectoryReader.java (revision 1152848) +++ lucene/src/java/org/apache/lucene/index/DirectoryReader.java (working copy) @@ -50,7 +50,8 @@ protected CodecProvider codecs; - IndexWriter writer; + // nocommit needed? + volatile IndexWriter writer; private IndexDeletionPolicy deletionPolicy; private Lock writeLock; @@ -309,7 +310,7 @@ buffer.append('('); final String segmentsFile = segmentInfos.getCurrentSegmentFileName(); if (segmentsFile != null) { - buffer.append(segmentsFile); + buffer.append(segmentsFile).append(":").append(segmentInfos.getVersion()); } if (writer != null) { buffer.append(":nrt"); Index: lucene/src/java/org/apache/lucene/index/BufferedDeletes.java =================================================================== --- lucene/src/java/org/apache/lucene/index/BufferedDeletes.java (revision 1152848) +++ lucene/src/java/org/apache/lucene/index/BufferedDeletes.java (working copy) @@ -93,7 +93,7 @@ } else { String s = "gen=" + gen; if (numTermDeletes.get() != 0) { - s += " " + numTermDeletes.get() + " deleted terms (unique count=" + terms.size() + ")"; + s += " " + numTermDeletes.get() + " deleted terms (unique count=" + terms.size() + ") terms=" + terms.keySet(); } if (queries.size() != 0) { s += " " + queries.size() + " deleted queries"; Index: lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java =================================================================== --- lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java (revision 1152848) +++ lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java (working copy) @@ -287,6 +287,7 @@ final Integer docIDUpto = segDeletes.get(new Term(fieldName, text)); if (docIDUpto != null) { delDocLimit = docIDUpto; + System.out.println(Thread.currentThread().getName() + ": delete term " + fieldName + ":" + text.utf8ToString() + " during flush"); } else { delDocLimit = 0; } @@ -356,6 +357,9 @@ state.liveDocs.invertAll(); } state.liveDocs.clear(docID); + System.out.println(Thread.currentThread().getName() + ": delete docID=" + docID + " limit=" + delDocLimit + " term=" + fieldName + ":" + text.utf8ToString()); + } else { + System.out.println(Thread.currentThread().getName() + ": skip delete docID=" + docID + " limit=" + delDocLimit + " term=" + fieldName + ":" + text.utf8ToString()); } if (currentFieldIndexOptions != IndexOptions.DOCS_ONLY) { Index: lucene/src/java/org/apache/lucene/index/DocumentsWriter.java =================================================================== --- lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (revision 1152848) +++ lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (working copy) @@ -165,7 +165,7 @@ } private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException { - if (deleteQueue != null) { + if (deleteQueue != null && !flushControl.isFullFlush()) { synchronized (ticketQueue) { // Freeze and insert the delete flush ticket in the queue ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false)); @@ -320,11 +320,29 @@ return maybeMerge; } + private ThreadState getThreadState() { + while (true) { + final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this); + if (perThread.perThread != null && perThread.perThread.deleteQueue != deleteQueue) { + // There is a flush-all in process and this DWPT is + // now stale -- enroll it for flush and try for + // another DWPT: + try { + flushControl.addFlushableState(perThread); + } finally { + perThread.unlock(); + } + } else { + return perThread; + } + } + } + boolean updateDocuments(final Iterable docs, final Analyzer analyzer, final Term delTerm) throws CorruptIndexException, IOException { boolean maybeMerge = preUpdate(); - final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this); + final ThreadState perThread = getThreadState(); final DocumentsWriterPerThread flushingDWPT; try { @@ -356,7 +374,8 @@ boolean maybeMerge = preUpdate(); - final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this); + final ThreadState perThread = getThreadState(); + final DocumentsWriterPerThread flushingDWPT; try { @@ -513,6 +532,7 @@ assert newSegment != null; final SegmentInfo segInfo = indexWriter.prepareFlushedSegment(newSegment); final BufferedDeletes deletes = newSegment.segmentDeletes; + System.out.println(Thread.currentThread().getName() + ": publishFlushedSegment seg-private deletes=" + deletes); FrozenBufferedDeletes packet = null; if (deletes != null && deletes.any()) { // Segment private delete @@ -542,6 +562,7 @@ final boolean flushAllThreads() throws IOException { final DocumentsWriterDeleteQueue flushingDeleteQueue; + System.out.println(Thread.currentThread().getName() + " startFullFlush"); synchronized (this) { flushingDeleteQueue = deleteQueue; @@ -564,6 +585,8 @@ // If a concurrent flush is still in flight wait for it flushControl.waitForFlush(); if (!anythingFlushed) { // apply deletes if we did not flush any document + // nocommit -- not safe? + System.out.println(Thread.currentThread().getName() + ": flush naked frozen global deletes"); synchronized (ticketQueue) { ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false)); } @@ -576,6 +599,7 @@ } final void finishFullFlush(boolean success) { + System.out.println(Thread.currentThread().getName() + " finishFullFlush success=" + success); assert setFlushingDeleteQueue(null); if (success) { // Release the flush lock Index: lucene/src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- lucene/src/java/org/apache/lucene/index/IndexWriter.java (revision 1152848) +++ lucene/src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -750,7 +750,7 @@ SegmentReader reader = readerPool.getIfExists(info); try { if (reader != null) { - return reader.numDeletedDocs(); + return Math.max(reader.numDeletedDocs(), info.getDelCount()); } else { return info.getDelCount(); } @@ -2338,6 +2338,7 @@ FrozenBufferedDeletes packet, FrozenBufferedDeletes globalPacket) throws IOException { // Lock order IW -> BDS synchronized (bufferedDeletesStream) { + System.out.println(Thread.currentThread().getName() + ": IW.publishFlushedSegment"); if (globalPacket != null && globalPacket.any()) { bufferedDeletesStream.push(globalPacket); } @@ -2351,6 +2352,7 @@ // generation right away nextGen = bufferedDeletesStream.getNextGen(); } + System.out.println(Thread.currentThread().getName() + ": set newSegment delGen=" + nextGen); newSegment.setBufferedDeletesGen(nextGen); segmentInfos.add(newSegment); checkpoint(); @@ -2714,12 +2716,73 @@ if (pendingCommit != null) throw new IllegalStateException("prepareCommit was already called with no corresponding call to commit"); - if (infoStream != null) + if (infoStream != null) { message("prepareCommit: flush"); + } - flush(true, true); + doBeforeFlush(); + assert testPoint("startDoFlush"); + SegmentInfos toCommit = null; + boolean anySegmentsFlushed = false; - startCommit(commitUserData); + // This is copied from doFlush, except it's modified to + // clone & incRef the flushed SegmentInfos inside the + // sync block: + boolean success = false; + + try { + + if (infoStream != null) { + message(" start flush in prepareCommit"); + message(" index before flush " + segString()); + } + + synchronized (fullFlushLock) { + try { + anySegmentsFlushed = docWriter.flushAllThreads(); + success = true; + } finally { + docWriter.finishFullFlush(success); + } + + success = false; + + synchronized(this) { + maybeApplyDeletes(true); + doAfterFlush(); + if (!anySegmentsFlushed) { + // flushCount is incremented in flushAllThreads + flushCount.incrementAndGet(); + } + + readerPool.commit(segmentInfos); + + toCommit = (SegmentInfos) segmentInfos.clone(); + + pendingCommitChangeCount = changeCount; + + // This protects the segmentInfos we are now going + // to commit. This is important in case, eg, while + // we are trying to sync all referenced files, a + // merge completes which would otherwise have + // removed the files we are now syncing. + deleter.incRef(toCommit, false); + success = true; + } + } + } catch (OutOfMemoryError oom) { + handleOOM(oom, "prepareCommit"); + } finally { + if (!success && infoStream != null) { + message("hit exception during prepareCommit"); + } + } + + if (anySegmentsFlushed) { + maybeMerge(); + } + + startCommit(toCommit, commitUserData); } // Used only by commit, below; lock order is commitLock -> IW @@ -2910,13 +2973,12 @@ } else if (infoStream != null) { message("don't apply deletes now delTermCount=" + bufferedDeletesStream.numTerms() + " bytesUsed=" + bufferedDeletesStream.bytesUsed()); } - } final synchronized void applyAllDeletes() throws IOException { flushDeletesCount.incrementAndGet(); - final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream - .applyDeletes(readerPool, segmentInfos.asList()); + final BufferedDeletesStream.ApplyDeletesResult result; + result = bufferedDeletesStream.applyDeletes(readerPool, segmentInfos.asList()); if (result.anyDeletes) { checkpoint(); } @@ -3808,7 +3870,7 @@ * if it wasn't already. If that succeeds, then we * prepare a new segments_N file but do not fully commit * it. */ - private void startCommit(Map commitUserData) throws IOException { + private void startCommit(final SegmentInfos toSync, final Map commitUserData) throws IOException { assert testPoint("startStartCommit"); assert pendingCommit == null; @@ -3819,44 +3881,31 @@ try { - if (infoStream != null) + if (infoStream != null) { message("startCommit(): start"); + } - final SegmentInfos toSync; - final long myChangeCount; - synchronized(this) { assert lastCommitChangeCount <= changeCount; - myChangeCount = changeCount; - if (changeCount == lastCommitChangeCount) { - if (infoStream != null) + if (pendingCommitChangeCount == lastCommitChangeCount) { + if (infoStream != null) { message(" skip startCommit(): no changes pending"); + } + deleter.decRef(toSync); return; } - // First, we clone & incref the segmentInfos we intend - // to sync, then, without locking, we sync() all files - // referenced by toSync, in the background. + if (infoStream != null) { + message("startCommit index=" + segString(toSync) + " changeCount=" + changeCount); + } - if (infoStream != null) - message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount); - - readerPool.commit(segmentInfos); - toSync = (SegmentInfos) segmentInfos.clone(); - assert filesExist(toSync); - if (commitUserData != null) + if (commitUserData != null) { toSync.setUserData(commitUserData); - - // This protects the segmentInfos we are now going - // to commit. This is important in case, eg, while - // we are trying to sync all referenced files, a - // merge completes which would otherwise have - // removed the files we are now syncing. - deleter.incRef(toSync, false); + } } assert testPoint("midStartCommit"); @@ -3881,19 +3930,18 @@ // an exception) toSync.prepareCommit(directory); + pendingCommitSet = true; pendingCommit = toSync; - pendingCommitSet = true; - pendingCommitChangeCount = myChangeCount; } - if (infoStream != null) + if (infoStream != null) { message("done all syncs"); + } assert testPoint("midStartCommitSuccess"); } finally { synchronized(this) { - // Have our master segmentInfos record the // generations we just prepared. We do this // on error or success so we don't @@ -3905,6 +3953,7 @@ message("hit exception committing segments file"); } + // Hit exception deleter.decRef(toSync); } } Index: lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java =================================================================== --- lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java (revision 1152848) +++ lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java (working copy) @@ -447,7 +447,7 @@ public void checkpoint(SegmentInfos segmentInfos, boolean isCommit) throws IOException { if (infoStream != null) { - message("now checkpoint \"" + segmentInfos + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]"); + message("now checkpoint \"" + segmentInfos.toString(directory) + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]"); } // Try again now to delete any previously un-deletable