Index: lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManager.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManager.java Wed Jun 08 17:58:33 2011 -0400 @@ -0,0 +1,401 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.util.ThreadInterruptedException; + +// TODO +// - we could make this work also w/ "normal" reopen/commit? +// - add "mustSeeDeletes" here, after lucene gets it +// - maybe we own indexing threads, and maintain queue.. +// - merges segment warmer +// - tune up the max merges/threads + +/** + * Utility class to manage opening near-real-time readers on + * a certain schedule, with control over which indexing + * changes must be visible when searching. + * + * @lucene.experimental +*/ + +public class NRTManager implements Closeable { + private final IndexWriter writer; + private final ReopenThread reopenThread; + private final ExecutorService es; + private final AtomicLong indexingGen; + private final AtomicLong searchingGen; + private volatile IndexSearcher currentSearcher; + + private final AtomicLong noDeletesSearchingGen; + private volatile IndexSearcher noDeletesCurrentSearcher; + + protected class ReopenThread extends Thread { + private final long targetMaxStaleNS; + private final long targetMinStaleNS; + private boolean finish; + private boolean waitingApplyDeletes; + private long waitingGen; + + public ReopenThread(double targetMaxStaleSec, double targetMinStaleSec) { + this.targetMaxStaleNS = (long) (1000000000*targetMaxStaleSec); + this.targetMinStaleNS = (long) (1000000000*targetMinStaleSec); + } + + public synchronized void finish() { + //System.out.println("NRT: set finish"); + this.finish = true; + notify(); + try { + join(); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + } + + public synchronized void waiting(boolean applyDeletes, long targetGen) { + waitingApplyDeletes |= applyDeletes; + waitingGen = Math.max(waitingGen, targetGen); + notify(); + //System.out.println(Thread.currentThread().getName() + ": force wakeup waitingGen=" + waitingGen + " applyDeletes=" + applyDeletes + " waitingApplyDeletes=" + waitingApplyDeletes); + } + + @Override + public void run() { + // TODO: maybe use private thread ticktock timer, in + // case clock shift messes up nanoTime? + long lastReopenStartNS = System.nanoTime(); + + //System.out.println("reopen: start"); + try { + while (true) { + + final boolean doApplyDeletes; + + boolean hasWaiting = false; + + synchronized(this) { + // TODO: try to guestimate how long reopen might + // take based on past data? + + while (!finish) { + //System.out.println("reopen: cycle"); + + // True if we have someone waiting for reopen'd searcher: + hasWaiting = waitingApplyDeletes ? waitingGen > searchingGen.get() : waitingGen > noDeletesSearchingGen.get(); + final long nextReopenStartNS = lastReopenStartNS + (hasWaiting ? targetMinStaleNS : targetMaxStaleNS); + + final long sleepNS = nextReopenStartNS - System.nanoTime(); + + if (sleepNS > 0) { + //System.out.println("reopen: sleep " + (sleepNS/1000000.0) + " ms (hasWaiting=" + hasWaiting + ")"); + try { + wait(sleepNS/1000000, (int) (sleepNS%1000000)); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + //System.out.println("NRT: set finish on interrupt"); + finish = true; + break; + } + } else { + break; + } + } + + if (finish) { + //System.out.println("reopen: finish"); + return; + } + + doApplyDeletes = hasWaiting ? waitingApplyDeletes : true; + waitingApplyDeletes = false; + //System.out.println("reopen: start hasWaiting=" + hasWaiting); + } + + lastReopenStartNS = System.nanoTime(); + try { + //final long t0 = System.nanoTime(); + reopen(doApplyDeletes); + //System.out.println("reopen took " + ((System.nanoTime()-t0)/1000000.0) + " msec"); + } catch (IOException ioe) { + //System.out.println(Thread.currentThread().getName() + ": IOE"); + //ioe.printStackTrace(); + throw new RuntimeException(ioe); + } + } + } catch (Throwable t) { + //System.out.println("REOPEN EXC"); + //t.printStackTrace(System.out); + throw new RuntimeException(t); + } + } + } + + /** + Create new NRTManager. + @param writer IndexWriter to open near-real-time + readers + @param targetMaxStaleSec Maximum time until a new + reader must be opened; this sets the upper bound + on how slowly reopens may occur + @param targetMinStaleSec Mininum time until a new + reader can be opened; this sets the lower bound + on how quickly reopens may occur, when a caller + is waiting for a specific indexing change to be + reflected. + @param es ExecutorService to pass to the IndexSearcher + */ + + public NRTManager(IndexWriter writer, double targetMaxStaleSec, double targetMinStaleSec, ExecutorService es) throws IOException { + + if (targetMaxStaleSec < targetMinStaleSec) { + throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")"); + } + + this.writer = writer; + this.es = es; + indexingGen = new AtomicLong(1); + searchingGen = new AtomicLong(-1); + noDeletesSearchingGen = new AtomicLong(-1); + + // Create initial reader: + swapSearcher(new IndexSearcher(IndexReader.open(writer, true), es), 0, true); + + reopenThread = launchReopenThread(targetMaxStaleSec, targetMinStaleSec); + } + + protected ReopenThread launchReopenThread(double targetMaxStaleSec, double targetMinStaleSec) { + ReopenThread thread = new ReopenThread(targetMaxStaleSec, targetMinStaleSec); + thread.setName("NRTManager Reopen Thread"); + thread.setDaemon(true); + thread.setPriority(Thread.currentThread().getPriority()+2); + thread.start(); + return thread; + } + + public long updateDocument(Term t, Document d, Analyzer a) throws IOException { + writer.updateDocument(t, d, a); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long updateDocument(Term t, Document d) throws IOException { + writer.updateDocument(t, d); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long updateDocuments(Term t, Iterable docs, Analyzer a) throws IOException { + writer.updateDocuments(t, docs, a); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long updateDocuments(Term t, Iterable docs) throws IOException { + writer.updateDocuments(t, docs); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long deleteDocuments(Term t) throws IOException { + writer.deleteDocuments(t); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long deleteDocuments(Query q) throws IOException { + writer.deleteDocuments(q); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long addDocument(Document d, Analyzer a) throws IOException { + writer.addDocument(d, a); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long addDocuments(Iterable docs, Analyzer a) throws IOException { + writer.addDocuments(docs, a); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long addDocument(Document d) throws IOException { + writer.addDocument(d); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long addDocuments(Iterable docs) throws IOException { + writer.addDocuments(docs); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + /** Returns the most current searcher. If you require a + * certain indexing generation be visible in the returned + * searcher, call {@link getSearcher(long)} + * instead. + * @param requireDeletes If false, the returned reader + may not reflect all deletions + */ + public synchronized IndexSearcher getSearcher(boolean requireDeletes) { + final IndexSearcher s; + if (requireDeletes) { + s = currentSearcher; + } else if (noDeletesSearchingGen.get() > searchingGen.get()) { + s = noDeletesCurrentSearcher; + } else { + s = currentSearcher; + } + s.getIndexReader().incRef(); + return s; + } + + /** Call this if you require a searcher reflecting all + * changes as of the target generation. + * + * @param targetGen Returned searcher must reflect changes + * as of this generation + * + * @param applyDeletes If true, the returned searcher must + * reflect all deletions. This can be substantially more + * costly than not applying deletes. Note that if you + * pass false, it's still possible the deletes will have + * been applied. + **/ + public synchronized IndexSearcher getSearcher(long targetGen, boolean applyDeletes) { + + assert noDeletesSearchingGen.get() >= searchingGen.get(); + + if (targetGen > getCurrentSearchingGen(applyDeletes)) { + // Must wait + final long t0 = System.nanoTime(); + reopenThread.waiting(applyDeletes, targetGen); + while (targetGen > getCurrentSearchingGen(applyDeletes)) { + //System.out.println(Thread.currentThread().getName() + ": wait fresh searcher targetGen=" + targetGen + " vs searchingGen=" + getCurrentSearchingGen(applyDeletes) + " applyDeletes=" + applyDeletes); + try { + wait(); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + } + //final long waitNS = System.nanoTime()-t0; + //System.out.println(Thread.currentThread().getName() + ": done wait fresh searcher targetGen=" + targetGen + " vs searchingGen=" + getCurrentSearchingGen(applyDeletes) + " applyDeletes=" + applyDeletes + " WAIT msec=" + (waitNS/1000000.0)); + } + + return getSearcher(applyDeletes); + } + + private long getCurrentSearchingGen(boolean requiresDeletes) { + return requiresDeletes ? searchingGen.get() : noDeletesSearchingGen.get(); + } + + /** Release the searcher obtained from {@link + * #getSearcher()} or {@link #getSearcher(long)}. */ + public void releaseSearcher(IndexSearcher s) throws IOException { + s.getIndexReader().decRef(); + } + + // returns false if searcher was current + private boolean reopen(boolean applyDeletes) throws IOException { + + // Mark gen as of when reopen started: + final long newSearcherGen = indexingGen.getAndIncrement(); + + if (applyDeletes && currentSearcher.getIndexReader().isCurrent()) { + //System.out.println("reopen: skip: isCurrent both force gen=" + newSearcherGen + " vs current gen=" + searchingGen); + searchingGen.set(newSearcherGen); + noDeletesSearchingGen.set(newSearcherGen); + synchronized(this) { + notifyAll(); + } + //System.out.println("reopen: skip: return"); + return false; + } else if (!applyDeletes && noDeletesCurrentSearcher.getIndexReader().isCurrent()) { + //System.out.println("reopen: skip: isCurrent force gen=" + newSearcherGen + " vs current gen=" + noDeletesSearchingGen); + noDeletesSearchingGen.set(newSearcherGen); + synchronized(this) { + notifyAll(); + } + //System.out.println("reopen: skip: return"); + return false; + } + + //System.out.println("indexingGen now " + indexingGen); + + // .reopen() returns a new reference: + + // Start from whichever searcher is most current: + final IndexSearcher startSearcher = noDeletesSearchingGen.get() > searchingGen.get() ? noDeletesCurrentSearcher : currentSearcher; + final IndexReader nextReader = startSearcher.getIndexReader().reopen(writer, applyDeletes); + + // Transfer reference to swapSearcher: + swapSearcher(new IndexSearcher(nextReader, es), + newSearcherGen, + applyDeletes); + + return true; + } + + // Steals a reference from newSearcher: + private synchronized void swapSearcher(IndexSearcher newSearcher, long newSearchingGen, boolean applyDeletes) throws IOException { + //System.out.println(Thread.currentThread().getName() + ": swap searcher gen=" + newSearchingGen + " applyDeletes=" + applyDeletes); + + // Always replace noDeletesCurrentSearcher: + if (noDeletesCurrentSearcher != null) { + noDeletesCurrentSearcher.getIndexReader().decRef(); + } + noDeletesCurrentSearcher = newSearcher; + assert newSearchingGen > noDeletesSearchingGen.get(): "newSearchingGen=" + newSearchingGen + " noDeletesSearchingGen=" + noDeletesSearchingGen; + noDeletesSearchingGen.set(newSearchingGen); + + if (applyDeletes) { + // Deletes were applied, so we also update currentSearcher: + if (currentSearcher != null) { + currentSearcher.getIndexReader().decRef(); + } + currentSearcher = newSearcher; + if (newSearcher != null) { + newSearcher.getIndexReader().incRef(); + } + assert newSearchingGen > searchingGen.get(): "newSearchingGen=" + newSearchingGen + " searchingGen=" + searchingGen; + searchingGen.set(newSearchingGen); + } + + notifyAll(); + //System.out.println(Thread.currentThread().getName() + ": done"); + } + + /** NOTE: caller must separately close the writer. */ + // @Override -- not until Java 1.6 + public void close() throws IOException { + reopenThread.finish(); + swapSearcher(null, indexingGen.getAndIncrement(), true); + } +} Index: lucene/contrib/misc/src/test/org/apache/lucene/index/TestNRTManager.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ lucene/contrib/misc/src/test/org/apache/lucene/index/TestNRTManager.java Wed Jun 08 17:58:33 2011 -0400 @@ -0,0 +1,676 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.Fieldable; +import org.apache.lucene.index.codecs.CodecProvider; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.PhraseQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MockDirectoryWrapper; +import org.apache.lucene.store.NRTCachingDirectory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.LineFileDocs; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.NamedThreadFactory; +import org.apache.lucene.util._TestUtil; +import org.junit.Test; + +// TODO +// - mix in optimize, addIndexes +// - randomoly mix in non-congruent docs + +// NOTE: This is a copy of TestNRTThreads, but swapping in +// NRTManager for adding/updating/searching + +public class TestNRTManager extends LuceneTestCase { + + private static class SubDocs { + public final String packID; + public final List subIDs; + public boolean deleted; + + public SubDocs(String packID, List subIDs) { + this.packID = packID; + this.subIDs = subIDs; + } + } + + // TODO: is there a pre-existing way to do this!!! + private Document cloneDoc(Document doc1) { + final Document doc2 = new Document(); + for(Fieldable f : doc1.getFields()) { + Field field1 = (Field) f; + + Field field2 = new Field(field1.name(), + field1.stringValue(), + field1.isStored() ? Field.Store.YES : Field.Store.NO, + field1.isIndexed() ? (field1.isTokenized() ? Field.Index.ANALYZED : Field.Index.NOT_ANALYZED) : Field.Index.NO); + if (field1.getOmitNorms()) { + field2.setOmitNorms(true); + } + if (field1.getOmitTermFreqAndPositions()) { + field2.setOmitTermFreqAndPositions(true); + } + doc2.add(field2); + } + + return doc2; + } + + @Test + public void testNRTManager() throws Exception { + + final long t0 = System.currentTimeMillis(); + + if (CodecProvider.getDefault().getDefaultFieldCodec().equals("SimpleText")) { + // no + CodecProvider.getDefault().setDefaultFieldCodec("Standard"); + } + + final LineFileDocs docs = new LineFileDocs(random); + final File tempDir = _TestUtil.getTempDir("nrtopenfiles"); + final MockDirectoryWrapper _dir = newFSDirectory(tempDir); + _dir.setCheckIndexOnClose(false); // don't double-checkIndex, we do it ourselves + Directory dir = _dir; + final IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)).setOpenMode(IndexWriterConfig.OpenMode.CREATE); + + // newIWConfig makes smallish max seg size, which + // results in tons and tons of segments for this test + // when run nightly: + MergePolicy mp = conf.getMergePolicy(); + if (mp instanceof TieredMergePolicy) { + ((TieredMergePolicy) mp).setMaxMergedSegmentMB(5000.); + } else if (mp instanceof LogByteSizeMergePolicy) { + ((LogByteSizeMergePolicy) mp).setMaxMergeMB(1000.); + } else if (mp instanceof LogMergePolicy) { + ((LogMergePolicy) mp).setMaxMergeDocs(100000); + } + + conf.setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() { + @Override + public void warm(IndexReader reader) throws IOException { + if (VERBOSE) { + System.out.println("TEST: now warm merged reader=" + reader); + } + final int maxDoc = reader.maxDoc(); + final Bits delDocs = reader.getDeletedDocs(); + int sum = 0; + final int inc = Math.max(1, maxDoc/50); + for(int docID=0;docID lastGens = new ArrayList(); + + final Set delIDs = Collections.synchronizedSet(new HashSet()); + final List allSubDocs = Collections.synchronizedList(new ArrayList()); + + final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000; + Thread[] threads = new Thread[NUM_INDEX_THREADS]; + for(int thread=0;thread toDeleteIDs = new ArrayList(); + final List toDeleteSubDocs = new ArrayList(); + + long gen = 0; + while(System.currentTimeMillis() < stopTime && !failed.get()) { + + //System.out.println(Thread.currentThread().getName() + ": cycle"); + try { + // Occassional longish pause if running + // nightly + if (LuceneTestCase.TEST_NIGHTLY && random.nextInt(6) == 3) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": now long sleep"); + } + Thread.sleep(_TestUtil.nextInt(random, 50, 500)); + } + + // Rate limit ingest rate: + Thread.sleep(_TestUtil.nextInt(random, 1, 10)); + if (VERBOSE) { + System.out.println(Thread.currentThread() + ": done sleep"); + } + + Document doc = docs.nextDoc(); + if (doc == null) { + break; + } + final String addedField; + if (random.nextBoolean()) { + addedField = "extra" + random.nextInt(10); + doc.add(new Field(addedField, "a random field", Field.Store.NO, Field.Index.ANALYZED)); + } else { + addedField = null; + } + if (random.nextBoolean()) { + + if (random.nextBoolean()) { + // Add a pack of adjacent sub-docs + final String packID; + final SubDocs delSubDocs; + if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) { + delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size())); + assert !delSubDocs.deleted; + toDeleteSubDocs.remove(delSubDocs); + // reuse prior packID + packID = delSubDocs.packID; + } else { + delSubDocs = null; + // make new packID + packID = packCount.getAndIncrement() + ""; + } + + final Field packIDField = newField("packID", packID, Field.Store.YES, Field.Index.NOT_ANALYZED); + final List docIDs = new ArrayList(); + final SubDocs subDocs = new SubDocs(packID, docIDs); + final List docsList = new ArrayList(); + + allSubDocs.add(subDocs); + doc.add(packIDField); + docsList.add(cloneDoc(doc)); + docIDs.add(doc.get("docid")); + + final int maxDocCount = _TestUtil.nextInt(random, 1, 10); + while(docsList.size() < maxDocCount) { + doc = docs.nextDoc(); + if (doc == null) { + break; + } + docsList.add(cloneDoc(doc)); + docIDs.add(doc.get("docid")); + } + addCount.addAndGet(docsList.size()); + + if (delSubDocs != null) { + delSubDocs.deleted = true; + delIDs.addAll(delSubDocs.subIDs); + delCount.addAndGet(delSubDocs.subIDs.size()); + if (VERBOSE) { + System.out.println("TEST: update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs); + } + gen = nrt.updateDocuments(new Term("packID", delSubDocs.packID), docsList); + /* + // non-atomic: + nrt.deleteDocuments(new Term("packID", delSubDocs.packID)); + for(Document subDoc : docsList) { + nrt.addDocument(subDoc); + } + */ + } else { + if (VERBOSE) { + System.out.println("TEST: add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs); + } + gen = nrt.addDocuments(docsList); + + /* + // non-atomic: + for(Document subDoc : docsList) { + nrt.addDocument(subDoc); + } + */ + } + doc.removeField("packID"); + + if (random.nextInt(5) == 2) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID); + } + toDeleteSubDocs.add(subDocs); + } + + // randomly verify the add/update "took": + if (random.nextInt(20) == 2) { + final boolean applyDeletes = delSubDocs != null; + final IndexSearcher s = nrt.getSearcher(gen, applyDeletes); + try { + assertEquals(docsList.size(), s.search(new TermQuery(new Term("packID", packID)), 10).totalHits); + } finally { + nrt.releaseSearcher(s); + } + } + + } else { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": add doc docid:" + doc.get("docid")); + } + + gen = nrt.addDocument(doc); + addCount.getAndIncrement(); + + // randomly verify the add "took": + if (random.nextInt(20) == 2) { + //System.out.println(Thread.currentThread().getName() + ": verify"); + final IndexSearcher s = nrt.getSearcher(gen, false); + //System.out.println(Thread.currentThread().getName() + ": got s=" + s); + try { + assertEquals(1, s.search(new TermQuery(new Term("docid", doc.get("docid"))), 10).totalHits); + } finally { + nrt.releaseSearcher(s); + } + //System.out.println(Thread.currentThread().getName() + ": done verify"); + } + + if (random.nextInt(5) == 3) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid")); + } + toDeleteIDs.add(doc.get("docid")); + } + } + } else { + // we use update but it never replaces a + // prior doc + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid")); + } + gen = nrt.updateDocument(new Term("docid", doc.get("docid")), doc); + addCount.getAndIncrement(); + + // randomly verify the add "took": + if (random.nextInt(20) == 2) { + final IndexSearcher s = nrt.getSearcher(gen, true); + try { + assertEquals(1, s.search(new TermQuery(new Term("docid", doc.get("docid"))), 10).totalHits); + } finally { + nrt.releaseSearcher(s); + } + } + + if (random.nextInt(5) == 3) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid")); + } + toDeleteIDs.add(doc.get("docid")); + } + } + + if (random.nextInt(30) == 17) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes"); + } + for(String id : toDeleteIDs) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": del term=id:" + id); + } + gen = nrt.deleteDocuments(new Term("docid", id)); + + // randomly verify the delete "took": + if (random.nextInt(20) == 7) { + final IndexSearcher s = nrt.getSearcher(gen, true); + try { + assertEquals(0, s.search(new TermQuery(new Term("docid", id)), 10).totalHits); + } finally { + nrt.releaseSearcher(s); + } + } + } + + final int count = delCount.addAndGet(toDeleteIDs.size()); + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": tot " + count + " deletes"); + } + delIDs.addAll(toDeleteIDs); + toDeleteIDs.clear(); + + for(SubDocs subDocs : toDeleteSubDocs) { + assertTrue(!subDocs.deleted); + gen = nrt.deleteDocuments(new Term("packID", subDocs.packID)); + subDocs.deleted = true; + if (VERBOSE) { + System.out.println(" del subs: " + subDocs.subIDs + " packID=" + subDocs.packID); + } + delIDs.addAll(subDocs.subIDs); + delCount.addAndGet(subDocs.subIDs.size()); + + // randomly verify the delete "took": + if (random.nextInt(20) == 7) { + final IndexSearcher s = nrt.getSearcher(gen, true); + try { + assertEquals(0, s.search(new TermQuery(new Term("packID", subDocs.packID)), 1).totalHits); + } finally { + nrt.releaseSearcher(s); + } + } + } + toDeleteSubDocs.clear(); + } + if (addedField != null) { + doc.removeField(addedField); + } + } catch (Throwable t) { + System.out.println(Thread.currentThread().getName() + ": FAILED: hit exc"); + t.printStackTrace(); + failed.set(true); + throw new RuntimeException(t); + } + } + + lastGens.add(gen); + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": indexing done"); + } + } + }; + threads[thread].setDaemon(true); + threads[thread].start(); + } + + if (VERBOSE) { + System.out.println("TEST: DONE start indexing threads [" + (System.currentTimeMillis()-t0) + " ms]"); + } + + // let index build up a bit + Thread.sleep(100); + + // silly starting guess: + final AtomicInteger totTermCount = new AtomicInteger(100); + + // run search threads + final Thread[] searchThreads = new Thread[NUM_SEARCH_THREADS]; + final AtomicInteger totHits = new AtomicInteger(); + + if (VERBOSE) { + System.out.println("TEST: start search threads"); + } + + for(int thread=0;thread 0) { + Fields fields = MultiFields.getFields(s.getIndexReader()); + if (fields == null) { + continue; + } + Terms terms = fields.terms("body"); + if (terms == null) { + continue; + } + + TermsEnum termsEnum = terms.iterator(); + int seenTermCount = 0; + int shift; + int trigger; + if (totTermCount.get() == 0) { + shift = 0; + trigger = 1; + } else { + shift = random.nextInt(totTermCount.get()/10); + trigger = totTermCount.get()/10; + } + + while(System.currentTimeMillis() < stopTime) { + BytesRef term = termsEnum.next(); + if (term == null) { + if (seenTermCount == 0) { + break; + } + totTermCount.set(seenTermCount); + seenTermCount = 0; + if (totTermCount.get() == 0) { + shift = 0; + trigger = 1; + } else { + trigger = totTermCount.get()/10; + //System.out.println("trigger " + trigger); + shift = random.nextInt(totTermCount.get()/10); + } + termsEnum.seek(new BytesRef("")); + continue; + } + seenTermCount++; + // search 10 terms + if (trigger == 0) { + trigger = 1; + } + if ((seenTermCount + shift) % trigger == 0) { + //if (VERBOSE) { + //System.out.println(Thread.currentThread().getName() + " now search body:" + term.utf8ToString()); + //} + totHits.addAndGet(runQuery(s, new TermQuery(new Term("body", term)))); + } + } + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": search done"); + } + } + } finally { + nrt.releaseSearcher(s); + } + } catch (Throwable t) { + System.out.println(Thread.currentThread().getName() + ": FAILED: hit exc"); + failed.set(true); + t.printStackTrace(System.out); + throw new RuntimeException(t); + } + } + } + }; + searchThreads[thread].setDaemon(true); + searchThreads[thread].start(); + } + + if (VERBOSE) { + System.out.println("TEST: now join"); + } + for(int thread=0;thread