Index: solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java =================================================================== --- solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java (revision 1151902) +++ solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java (working copy) @@ -16,19 +16,6 @@ */ package org.apache.solr.search; -import org.apache.lucene.analysis.core.WhitespaceAnalyzer; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.Term; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.TermQuery; -import org.apache.lucene.search.TopDocs; -import org.apache.lucene.store.RAMDirectory; -import org.apache.lucene.util.Version; import org.apache.noggit.ObjectBuilder; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.common.SolrException; @@ -273,254 +260,4 @@ } } - - - - - IndexReader reader; - - @Test - public void testStressLuceneNRT() throws Exception { - // update variables - final int commitPercent = 10; - final int softCommitPercent = 50; // what percent of the commits are soft - final int deletePercent = 8; - final int deleteByQueryPercent = 4; - final int ndocs = 100; - int nWriteThreads = 10; - final int maxConcurrentCommits = 2; // number of committers at a time... needed if we want to avoid commit errors due to exceeding the max - final boolean tombstones = false; - - // query variables - final AtomicLong operations = new AtomicLong(10000000); // number of query operations to perform in total // TODO: temporarily high due to lack of stability - int nReadThreads = 10; - - initModel(ndocs); - - final AtomicInteger numCommitting = new AtomicInteger(); - - List threads = new ArrayList(); - - RAMDirectory dir = new RAMDirectory(); - final IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Version.LUCENE_40, new WhitespaceAnalyzer(Version.LUCENE_40))); - writer.commit(); - reader = IndexReader.open(dir); - - for (int i=0; i 0) { - int oper = rand.nextInt(100); - - if (oper < commitPercent) { - if (numCommitting.incrementAndGet() <= maxConcurrentCommits) { - Map newCommittedModel; - long version; - IndexReader oldReader; - - synchronized(TestRealTimeGet.this) { - newCommittedModel = new HashMap(model); // take a snapshot - version = snapshotCount++; - oldReader = reader; - oldReader.incRef(); // increment the reference since we will use this for reopening - } - - IndexReader newReader; - if (rand.nextInt(100) < softCommitPercent) { - // assertU(h.commit("softCommit","true")); - newReader = oldReader.reopen(writer, true); - } else { - // assertU(commit()); - writer.commit(); - newReader = oldReader.reopen(); - } - - synchronized(TestRealTimeGet.this) { - // install the new reader if it's newest (and check the current version since another reader may have already been installed) - if (newReader.getVersion() > reader.getVersion()) { - reader.decRef(); - reader = newReader; - - // install this snapshot only if it's newer than the current one - if (version >= committedModelClock) { - committedModel = newCommittedModel; - committedModelClock = version; - } - - } else if (newReader != oldReader) { - // if the same reader, don't decRef. - newReader.decRef(); - } - - oldReader.decRef(); - } - } - numCommitting.decrementAndGet(); - continue; - } - - - int id = rand.nextInt(ndocs); - Object sync = syncArr[id]; - - // set the lastId before we actually change it sometimes to try and - // uncover more race conditions between writing and reading - boolean before = rand.nextBoolean(); - if (before) { - lastId = id; - } - - // We can't concurrently update the same document and retain our invariants of increasing values - // since we can't guarantee what order the updates will be executed. - synchronized (sync) { - Long val = model.get(id); - long nextVal = Math.abs(val)+1; - - if (oper < commitPercent + deletePercent) { - // assertU("" + id + ""); - - // add tombstone first - if (tombstones) { - Document d = new Document(); - d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS)); - d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO)); - writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d); - } - - writer.deleteDocuments(new Term("id",Integer.toString(id))); - model.put(id, -nextVal); - } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) { - //assertU("id:" + id + ""); - - // add tombstone first - if (tombstones) { - Document d = new Document(); - d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS)); - d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO)); - writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d); - } - - writer.deleteDocuments(new TermQuery(new Term("id", Integer.toString(id)))); - model.put(id, -nextVal); - } else { - // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal))); - Document d = new Document(); - d.add(new Field("id",Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS)); - d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO)); - writer.updateDocument(new Term("id", Integer.toString(id)), d); - - if (tombstones) { - // remove tombstone after new addition (this should be optional?) - writer.deleteDocuments(new Term("id","-"+Integer.toString(id))); - } - - model.put(id, nextVal); - } - } - - if (!before) { - lastId = id; - } - } - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - }; - - threads.add(thread); - } - - - for (int i=0; i= 0) { - // bias toward a recently changed doc - int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs); - - // when indexing, we update the index, then the model - // so when querying, we should first check the model, and then the index - - long val; - - synchronized(TestRealTimeGet.this) { - val = committedModel.get(id); - } - - - IndexReader r; - synchronized(TestRealTimeGet.this) { - r = reader; - r.incRef(); - } - - // sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true"); - IndexSearcher searcher = new IndexSearcher(r); - Query q = new TermQuery(new Term("id",Integer.toString(id))); - TopDocs results = searcher.search(q, 1); - - if (results.totalHits == 0 && tombstones) { - // if we couldn't find the doc, look for it's tombstone - q = new TermQuery(new Term("id","-"+Integer.toString(id))); - results = searcher.search(q, 1); - if (results.totalHits == 0) { - if (val == -1L) { - // expected... no doc was added yet - continue; - } - fail("No documents or tombstones found for id " + id + ", expected at least " + val); - } - } - - if (results.totalHits == 0 && !tombstones) { - // nothing to do - we can't tell anything from a deleted doc without tombstones - } else { - assertEquals(1, results.totalHits); // we should have found the document, or it's tombstone - Document doc = searcher.doc(results.scoreDocs[0].doc); - long foundVal = Long.parseLong(doc.get(field)); - if (foundVal < Math.abs(val)) { - System.out.println("model_val="+val+" foundVal="+foundVal); - } - assertTrue(foundVal >= Math.abs(val)); - } - - r.decRef(); - } - } - catch (Throwable e) { - operations.set(-1L); - SolrException.log(log,e); - fail(e.toString()); - } - } - }; - - threads.add(thread); - } - - - for (Thread thread : threads) { - thread.start(); - } - - for (Thread thread : threads) { - thread.join(); - } - - writer.close(); - reader.close(); - - } - - - } Index: lucene/src/test/org/apache/lucene/index/TestStressNRT.java =================================================================== --- lucene/src/test/org/apache/lucene/index/TestStressNRT.java (revision 0) +++ lucene/src/test/org/apache/lucene/index/TestStressNRT.java (revision 0) @@ -0,0 +1,358 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.RAMDirectory; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.Version; + +// nocommit backport to 3.x + +public class TestStressNRT extends LuceneTestCase { + volatile IndexReader reader; + + final ConcurrentHashMap model = new ConcurrentHashMap(); + Map committedModel = new HashMap(); + long snapshotCount; + long committedModelClock; + volatile int lastId; + final String field = "val_l"; + Object[] syncArr; + + private void initModel(int ndocs) { + snapshotCount = 0; + committedModelClock = 0; + lastId = 0; + + syncArr = new Object[ndocs]; + + for (int i=0; i threads = new ArrayList(); + + // nocommit cutover to newDirectory: + Directory dir = new RAMDirectory(); + + // nocommit cutover to newIWC/RIW, TEST_VERSION_CURRENT + // nocommit -- don't wire the MP + final IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Version.LUCENE_40, new MockAnalyzer(random)).setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES)); + writer.setInfoStream(VERBOSE ? System.out : null); + writer.commit(); + reader = IndexReader.open(dir); + + for (int i=0; i 0) { + int oper = rand.nextInt(100); + + if (oper < commitPercent) { + if (numCommitting.incrementAndGet() <= maxConcurrentCommits) { + Map newCommittedModel; + long version; + IndexReader oldReader; + + synchronized(TestStressNRT.this) { + newCommittedModel = new HashMap(model); // take a snapshot + version = snapshotCount++; + oldReader = reader; + oldReader.incRef(); // increment the reference since we will use this for reopening + } + + IndexReader newReader; + // nocommit + if (rand.nextInt(100) < softCommitPercent) { + // assertU(h.commit("softCommit","true")); + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": reopen reader=" + oldReader + " version=" + version); + } + newReader = oldReader.reopen(writer, true); + } else { + // assertU(commit()); + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": commit+reopen reader=" + oldReader + " version=" + version); + } + writer.commit(); + // nocommit -- this is reopening an NRT + // reader still? + newReader = oldReader.reopen(); + } + oldReader.decRef(); + + synchronized(TestStressNRT.this) { + // install the new reader if it's newest (and check the current version since another reader may have already been installed) + // nocommit -- maybe getVersion is + // buggy? + //System.out.println(Thread.currentThread().getName() + ": newVersion=" + newReader.getVersion()); + + if (newReader.getVersion() > reader.getVersion()) { + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": install new reader=" + newReader); + } + reader.decRef(); + reader = newReader; + + // install this snapshot only if it's newer than the current one + if (version >= committedModelClock) { + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": install new model version=" + version); + } + committedModel = newCommittedModel; + committedModelClock = version; + } else { + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": skip install new model version=" + version); + } + } + } else if (newReader != reader) { + // if the same reader, don't decRef. + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": skip install new reader=" + newReader); + } + newReader.decRef(); + } + } + } + numCommitting.decrementAndGet(); + } else { + + int id = rand.nextInt(ndocs); + Object sync = syncArr[id]; + + // set the lastId before we actually change it sometimes to try and + // uncover more race conditions between writing and reading + boolean before = random.nextBoolean(); + if (before) { + lastId = id; + } + + // We can't concurrently update the same document and retain our invariants of increasing values + // since we can't guarantee what order the updates will be executed. + synchronized (sync) { + Long val = model.get(id); + long nextVal = Math.abs(val)+1; + + if (oper < commitPercent + deletePercent) { + // assertU("" + id + ""); + + // add tombstone first + if (tombstones) { + Document d = new Document(); + d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS)); + d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO)); + writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d); + } + + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": delDocs id=" + id); + } + writer.deleteDocuments(new Term("id",Integer.toString(id))); + model.put(id, -nextVal); + } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) { + //assertU("id:" + id + ""); + + // add tombstone first + if (tombstones) { + Document d = new Document(); + d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS)); + d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO)); + writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d); + } + + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": delDocsByQuery id=" + id); + } + writer.deleteDocuments(new TermQuery(new Term("id", Integer.toString(id)))); + model.put(id, -nextVal); + } else { + // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal))); + Document d = new Document(); + // nocommit cutover to newField + d.add(new Field("id",Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS)); + d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO)); + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": updateDoc id:" + id + " val=" + nextVal); + } + writer.updateDocument(new Term("id", Integer.toString(id)), d); + if (tombstones) { + // remove tombstone after new addition (this should be optional?) + writer.deleteDocuments(new Term("id","-"+Integer.toString(id))); + } + model.put(id, nextVal); + } + } + + if (!before) { + lastId = id; + } + } + } + } catch (Throwable e) { + System.out.println(Thread.currentThread().getName() + ": FAILED: unexpected exception"); + e.printStackTrace(System.out); + throw new RuntimeException(e); + } + } + }; + + threads.add(thread); + } + + for (int i=0; i= 0) { + // bias toward a recently changed doc + int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs); + + // when indexing, we update the index, then the model + // so when querying, we should first check the model, and then the index + + long val; + IndexReader r; + synchronized(TestStressNRT.this) { + val = committedModel.get(id); + r = reader; + r.incRef(); + } + + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": search id=" + id + " val=" + val + " r=" + r.getVersion()); + } + + // sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true"); + IndexSearcher searcher = new IndexSearcher(r); + Query q = new TermQuery(new Term("id",Integer.toString(id))); + TopDocs results = searcher.search(q, 10); + + if (results.totalHits == 0 && tombstones) { + // if we couldn't find the doc, look for it's tombstone + q = new TermQuery(new Term("id","-"+Integer.toString(id))); + results = searcher.search(q, 1); + if (results.totalHits == 0) { + if (val == -1L) { + // expected... no doc was added yet + continue; + } + fail("No documents or tombstones found for id " + id + ", expected at least " + val); + } + } + + if (results.totalHits == 0 && !tombstones) { + // nothing to do - we can't tell anything from a deleted doc without tombstones + } else { + // we should have found the document, or its tombstone + if (results.totalHits != 1) { + System.out.println("FAIL: hits id:" + id + " val=" + val); + for(ScoreDoc sd : results.scoreDocs) { + final Document doc = reader.document(sd.doc); + System.out.println(" docID=" + sd.doc + " id:" + doc.get("id") + " foundVal=" + doc.get(field)); + } + fail("id=" + id + " reader=" + r + " totalHits=" + results.totalHits); + } + Document doc = searcher.doc(results.scoreDocs[0].doc); + long foundVal = Long.parseLong(doc.get(field)); + if (foundVal < Math.abs(val)) { + System.out.println("model_val="+val+" foundVal="+foundVal); + } + + assertTrue("foundVal=" + foundVal + " val=" + val + " id=" + id + " reader=" + r, foundVal >= Math.abs(val)); + } + + r.decRef(); + } + } catch (Throwable e) { + operations.set(-1L); + System.out.println(Thread.currentThread().getName() + ": FAILED: unexpected exception"); + e.printStackTrace(System.out); + throw new RuntimeException(e); + } + } + }; + + threads.add(thread); + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + writer.close(); + reader.close(); + } +} Property changes on: lucene/src/test/org/apache/lucene/index/TestStressNRT.java ___________________________________________________________________ Added: svn:eol-style + native Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java =================================================================== --- lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (revision 1151902) +++ lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (working copy) @@ -18,6 +18,7 @@ */ import java.io.IOException; import java.util.ArrayList; +import java.util.List; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -372,9 +373,10 @@ // we do another full flush DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1); documentsWriter.deleteQueue = newQueue; + //System.out.println(Thread.currentThread().getName() + ": swap delete queue old=" + flushingQueue + " new=" + newQueue); + } - final Iterator allActiveThreads = perThreadPool - .getActivePerThreadsIterator(); + final Iterator allActiveThreads = perThreadPool.getActivePerThreadsIterator(); final ArrayList toFlush = new ArrayList(); while (allActiveThreads.hasNext()) { final ThreadState next = allActiveThreads.next(); @@ -426,9 +428,37 @@ pruneBlockedQueue(flushingQueue); assert assertBlockedFlushes(documentsWriter.deleteQueue); flushQueue.addAll(toFlush); + flushQueue.addAll(externalToFlush); + externalToFlush.clear(); stallControl.updateStalled(this); } } + + private final List externalToFlush = new ArrayList(); + + // nocommit -- assert that we hold the lock + void addFlushableState(ThreadState perThread) { + //System.out.println(Thread.currentThread().getName() + ": add external flushable state"); + final DocumentsWriterPerThread dwpt = perThread.perThread; + assert perThread.isActive(); + assert fullFlush; + assert dwpt.deleteQueue != documentsWriter.deleteQueue; + if (dwpt.getNumDocsInRAM() > 0) { + synchronized(this) { + setFlushPending(perThread); + final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(perThread); + assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents"; + assert dwpt == flushingDWPT : "flushControl returned different DWPT"; + externalToFlush.add(flushingDWPT); + } + } else { + if (closed) { + perThread.resetWriter(null); // make this state inactive + } else { + dwpt.initialize(); + } + } + } /** * Prunes the blockedQueue by removing all DWPT that are associated with the given flush queue. Index: lucene/src/java/org/apache/lucene/index/DirectoryReader.java =================================================================== --- lucene/src/java/org/apache/lucene/index/DirectoryReader.java (revision 1151902) +++ lucene/src/java/org/apache/lucene/index/DirectoryReader.java (working copy) @@ -309,7 +309,7 @@ buffer.append('('); final String segmentsFile = segmentInfos.getCurrentSegmentFileName(); if (segmentsFile != null) { - buffer.append(segmentsFile); + buffer.append(segmentsFile).append(":").append(segmentInfos.getVersion()); } if (writer != null) { buffer.append(":nrt"); Index: lucene/src/java/org/apache/lucene/index/DocumentsWriter.java =================================================================== --- lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (revision 1151902) +++ lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (working copy) @@ -324,6 +324,7 @@ final Term delTerm) throws CorruptIndexException, IOException { boolean maybeMerge = preUpdate(); + // nocommit must also loop & flush if stale here: final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this); final DocumentsWriterPerThread flushingDWPT; @@ -356,7 +357,17 @@ boolean maybeMerge = preUpdate(); - final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this); + ThreadState perThread; + while (true) { + perThread = perThreadPool.getAndLock(Thread.currentThread(), this); + if (perThread.perThread.deleteQueue != deleteQueue) { + flushControl.addFlushableState(perThread); + perThread.unlock(); + } else { + break; + } + } + final DocumentsWriterPerThread flushingDWPT; try {