Index: lucene/src/java/org/apache/lucene/index/NRTManager.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ lucene/src/java/org/apache/lucene/index/NRTManager.java Wed Mar 09 11:57:46 2011 -0500 @@ -0,0 +1,224 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.ExecutorService; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.util.ThreadInterruptedException; + +// TODO +// - we could make this work also w/ "normal" reopen/commit? +// - add "mustSeeDeletes" here, after lucene gets it +// - maybe we own indexing threads, and maintain queue.. +// - merges segment warmer +// - tune up the max merges/threads + +/** @lucene.experimental */ +public class NRTManager implements Closeable { + private final IndexWriter w; + private final ReopenThread reopenThread; + private final ExecutorService es; + private volatile long indexingGen; + private volatile long searchingGen; + private volatile long waitingGen; + private volatile IndexSearcher currentSearcher; + + private class ReopenThread extends Thread { + private final long targetMaxStaleNS; + private final long targetMinStaleNS; + private boolean finish; + + public ReopenThread(long targetMaxStaleMS, long targetMinStaleMS) { + this.targetMaxStaleNS = 1000000*targetMaxStaleMS; + this.targetMinStaleNS = 1000000*targetMinStaleMS; + } + + public synchronized void finish() { + this.finish = true; + notify(); + try { + join(); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + } + + public synchronized void waiting() { + System.out.println(Thread.currentThread() + ": force wakeup waitingGen=" + waitingGen); + notify(); + } + + @Override + public void run() { + // TODO: maybe use private thread ticktock timer? in + // case clock shift messes up nanoTime: + long lastReopenNS = System.nanoTime(); + System.out.println("reopen: start"); + while(true) { + synchronized(this) { + // TODO: try to guestimate how long reopen might + // take based on past data? + + while(!finish) { + final long sleepNS = lastReopenNS + (waitingGen > searchingGen ? targetMinStaleNS : targetMaxStaleNS) - System.nanoTime(); + + if (sleepNS > 0) { + System.out.println("reopen: sleep " + (sleepNS/1000000.0) + " ms"); + try { + wait(sleepNS/1000000, (int) (sleepNS%1000000)); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + finish = true; + break; + } + } else { + break; + } + } + + if (finish) { + return; + } + } + + lastReopenNS = System.nanoTime(); + try { + reopen(); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + } + } + + public NRTManager(IndexWriter w, long targetMaxStaleMS, long targetMinStaleMS, ExecutorService es) throws IOException { + // Create initial reader: + this.w = w; + this.es = es; + // nocommit -- make true v false controllable: + swapSearcher(new IndexSearcher(IndexReader.open(w, true), es), 0); + indexingGen = 1; + reopenThread = new ReopenThread(targetMaxStaleMS, targetMinStaleMS); + reopenThread.setName("NRT Reopen"); + reopenThread.setDaemon(true); + reopenThread.setPriority(Thread.currentThread().getPriority()+2); + reopenThread.start(); + } + + public long getGen() { + return indexingGen; + } + + public long updateDocument(Term t, Document d, Analyzer a) throws IOException { + w.updateDocument(t, d, a); + // Return gen as of when indexing finished: + return indexingGen; + } + + public long updateDocument(Term t, Document d) throws IOException { + w.updateDocument(t, d); + // Return gen as of when indexing finished: + return indexingGen; + } + + public long deleteDocuments(Term t) throws IOException { + w.deleteDocuments(t); + // Return gen as of when indexing finished: + return indexingGen; + } + + public long deleteDocuments(Query q) throws IOException { + w.deleteDocuments(q); + // Return gen as of when indexing finished: + return indexingGen; + } + + public long addDocument(Document d, Analyzer a) throws IOException { + w.addDocument(d, a); + // Return gen as of when indexing finished: + return indexingGen; + } + + public long addDocument(Document d) throws IOException { + w.addDocument(d); + // Return gen as of when indexing finished: + return indexingGen; + } + + /** Call this if you don't require a certain generation. */ + public synchronized IndexSearcher getSearcher() { + currentSearcher.getIndexReader().incRef(); + return currentSearcher; + } + + /** Call this if you require a searcher reflecting all + * changes as of the target generation */ + public synchronized IndexSearcher getSearcher(long targetGen) { + if (targetGen > searchingGen) { + final long t0 = System.nanoTime(); + waitingGen = Math.max(waitingGen, targetGen); + reopenThread.waiting(); + while (targetGen > searchingGen) { + try { + wait(); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + } + final long waitNS = System.nanoTime()-t0; + System.out.println("WAIT ms=" + (waitNS/1000000.0)); + } + return getSearcher(); + } + + public void releaseSearcher(IndexSearcher s) throws IOException { + s.getIndexReader().decRef(); + } + + private void reopen() throws IOException { + // Mark gen as of when reopen started: + final long newSearcherGen = indexingGen++; + swapSearcher(new IndexSearcher(currentSearcher.getIndexReader().reopen(), es), + newSearcherGen); + } + + private synchronized void swapSearcher(IndexSearcher newSearcher, long newSearchingGen) throws IOException { + System.out.println(Thread.currentThread().getName() + ": swap searcher gen=" + newSearchingGen); + if (currentSearcher != null) { + currentSearcher.getIndexReader().decRef(); + } + // newSearcher arrives w/ RC 1; we steal it: + currentSearcher = newSearcher; + searchingGen = newSearchingGen; + notifyAll(); + System.out.println(Thread.currentThread().getName() + ": done"); + } + + /** NOTE: caller must separately close the writer. */ + // @Override -- not until Java 1.6 + public void close() throws IOException { + reopenThread.finish(); + swapSearcher(null, 0); + } +} Index: lucene/src/test/org/apache/lucene/index/TestNRTManager.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ lucene/src/test/org/apache/lucene/index/TestNRTManager.java Wed Mar 09 11:57:46 2011 -0500 @@ -0,0 +1,388 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.codecs.CodecProvider; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.PhraseQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.document.Field; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.MockDirectoryWrapper; +import org.apache.lucene.util.NamedThreadFactory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.LineFileDocs; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util._TestUtil; +import org.junit.Test; + +// TODO +// - mix in optimize, addIndexes +// - randomoly mix in non-congruent docs + +public class TestNRTManager extends LuceneTestCase { + + @Test + public void testNRTManager() throws Exception { + + final long t0 = System.currentTimeMillis(); + + if (CodecProvider.getDefault().getDefaultFieldCodec().equals("SimpleText")) { + // no + CodecProvider.getDefault().setDefaultFieldCodec("Standard"); + } + + final LineFileDocs docs = new LineFileDocs(random); + final File tempDir = _TestUtil.getTempDir("nrtopenfiles"); + final MockDirectoryWrapper dir = new MockDirectoryWrapper(random, FSDirectory.open(tempDir)); + final IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer()); + conf.setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() { + @Override + public void warm(IndexReader reader) throws IOException { + if (VERBOSE) { + System.out.println("TEST: now warm merged reader=" + reader); + } + final int maxDoc = reader.maxDoc(); + final Bits delDocs = reader.getDeletedDocs(); + int sum = 0; + final int inc = Math.max(1, maxDoc/50); + for(int docID=0;docID delIDs = Collections.synchronizedSet(new HashSet()); + + final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000; + Thread[] threads = new Thread[NUM_INDEX_THREADS]; + for(int thread=0;thread toDeleteIDs = new ArrayList(); + while(System.currentTimeMillis() < stopTime && !failed.get()) { + try { + Document doc = docs.nextDoc(); + if (doc == null) { + break; + } + final String addedField; + if (random.nextBoolean()) { + addedField = "extra" + random.nextInt(10); + doc.add(new Field(addedField, "a random field", Field.Store.NO, Field.Index.ANALYZED)); + } else { + addedField = null; + } + + final long gen; + + if (random.nextBoolean()) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": add doc id:" + doc.get("id")); + } + gen = nrt.addDocument(doc); + } else { + // we use update but it never replaces a + // prior doc + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("id")); + } + gen = nrt.updateDocument(new Term("id", doc.get("id")), doc); + } + + // randomly verify the add "took": + if (random.nextInt(10) == 7) { + final IndexSearcher s = nrt.getSearcher(gen); + try { + assertEquals(1, s.search(new TermQuery(new Term("id", doc.get("id"))), 10).totalHits); + } finally { + nrt.releaseSearcher(s); + } + } + + if (random.nextInt(5) == 3) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("id")); + } + toDeleteIDs.add(doc.get("id")); + } + + if (random.nextInt(50) == 17) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes"); + } + for(String id : toDeleteIDs) { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": del term=id:" + id); + } + final long delGen = nrt.deleteDocuments(new Term("id", id)); + + // randomly verify the delete "took": + if (random.nextInt(10) == 7) { + final IndexSearcher s = nrt.getSearcher(delGen); + try { + assertEquals(0, s.search(new TermQuery(new Term("id", id)), 10).totalHits); + } finally { + nrt.releaseSearcher(s); + } + } + } + final int count = delCount.addAndGet(toDeleteIDs.size()); + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": tot " + count + " deletes"); + } + delIDs.addAll(toDeleteIDs); + toDeleteIDs.clear(); + } + addCount.getAndIncrement(); + if (addedField != null) { + doc.removeField(addedField); + } + } catch (Exception exc) { + System.out.println(Thread.currentThread().getName() + ": hit exc"); + exc.printStackTrace(); + failed.set(true); + throw new RuntimeException(exc); + } + } + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": indexing done"); + } + } + }; + threads[thread].setDaemon(true); + threads[thread].start(); + } + + if (VERBOSE) { + System.out.println("TEST: DONE start threads [" + (System.currentTimeMillis()-t0) + " ms]"); + } + + // let index build up a bit + Thread.sleep(100); + + boolean any = false; + + // silly starting guess: + final AtomicInteger totTermCount = new AtomicInteger(100); + + // run search threads + final Thread[] searchThreads = new Thread[NUM_SEARCH_THREADS]; + final AtomicInteger totHits = new AtomicInteger(); + + for(int thread=0;thread