Index: lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManager.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManager.java Thu Jun 09 18:48:18 2011 -0400 @@ -0,0 +1,357 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.List; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.util.ThreadInterruptedException; + +// TODO +// - we could make this work also w/ "normal" reopen/commit? + +/** + * Utility class to manage sharing near-real-time searchers + * across multiple searching threads. + * + *
NOTE: to use this class, you must call reopen + * periodically. The {@link NRTManagerReopenThread} is a + * simple class to do this on a periodic basis. If you + * implement your own reopener, be sure to call {@link + * #addWaitingListener} so your reopener is notified when a + * caller is waiting for a specific generation searcher.
+ * + * @lucene.experimental +*/ + +public class NRTManager implements Closeable { + private final IndexWriter writer; + private final ExecutorService es; + private final AtomicLong indexingGen; + private final AtomicLong searchingGen; + private final AtomicLong noDeletesSearchingGen; + private final Listfalse for
+ * requireDeletes, you can get faster reopen time, but
+ * the returned reader is allowed to not reflect all
+ * deletions. See {@link IndexReader#open(IndexWriter,boolean)} */
+ public synchronized IndexSearcher get(boolean requireDeletes) {
+ final IndexSearcher s;
+ if (requireDeletes) {
+ s = currentSearcher;
+ } else if (noDeletesSearchingGen.get() > searchingGen.get()) {
+ s = noDeletesCurrentSearcher;
+ } else {
+ s = currentSearcher;
+ }
+ s.getIndexReader().incRef();
+ return s;
+ }
+
+ /** Call this if you require a searcher reflecting all
+ * changes as of the target generation.
+ *
+ * @param targetGen Returned searcher must reflect changes
+ * as of this generation
+ */
+ public synchronized IndexSearcher get(long targetGen) {
+ return get(targetGen, true);
+ }
+
+ /** Call this if you require a searcher reflecting all
+ * changes as of the target generation, and you don't
+ * require deletions to be reflected. Note that the
+ * returned searcher may still reflect some or all
+ * deletions.
+ *
+ * @param targetGen Returned searcher must reflect changes
+ * as of this generation
+ *
+ * @param requireDeletes If true, the returned searcher must
+ * reflect all deletions. This can be substantially more
+ * costly than not applying deletes. Note that if you
+ * pass false, it's still possible that some or all
+ * deletes may have been applied.
+ **/
+ public synchronized IndexSearcher get(long targetGen, boolean requireDeletes) {
+
+ assert noDeletesSearchingGen.get() >= searchingGen.get();
+
+ if (targetGen > getCurrentSearchingGen(requireDeletes)) {
+ // Must wait
+ //final long t0 = System.nanoTime();
+ for(WaitingListener listener : waitingListeners) {
+ listener.waiting(requireDeletes, targetGen);
+ }
+ while (targetGen > getCurrentSearchingGen(requireDeletes)) {
+ //System.out.println(Thread.currentThread().getName() + ": wait fresh searcher targetGen=" + targetGen + " vs searchingGen=" + getCurrentSearchingGen(requireDeletes) + " requireDeletes=" + requireDeletes);
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ }
+ }
+ //final long waitNS = System.nanoTime()-t0;
+ //System.out.println(Thread.currentThread().getName() + ": done wait fresh searcher targetGen=" + targetGen + " vs searchingGen=" + getCurrentSearchingGen(requireDeletes) + " requireDeletes=" + requireDeletes + " WAIT msec=" + (waitNS/1000000.0));
+ }
+
+ return get(requireDeletes);
+ }
+
+ /** Returns generation of current searcher. */
+ public long getCurrentSearchingGen(boolean requiresDeletes) {
+ return requiresDeletes ? searchingGen.get() : noDeletesSearchingGen.get();
+ }
+
+ /** Release the searcher obtained from {@link
+ * #get()} or {@link #get(long)}. */
+ public void release(IndexSearcher s) throws IOException {
+ s.getIndexReader().decRef();
+ }
+
+ /** Call this when you need the NRT reader to reopen.
+ *
+ * @param applyDeletes If true, the newly opened reader
+ * will reflect all deletes
+ */
+ public boolean reopen(boolean applyDeletes) throws IOException {
+
+ // Mark gen as of when reopen started:
+ final long newSearcherGen = indexingGen.getAndIncrement();
+
+ if (applyDeletes && currentSearcher.getIndexReader().isCurrent()) {
+ //System.out.println("reopen: skip: isCurrent both force gen=" + newSearcherGen + " vs current gen=" + searchingGen);
+ searchingGen.set(newSearcherGen);
+ noDeletesSearchingGen.set(newSearcherGen);
+ synchronized(this) {
+ notifyAll();
+ }
+ //System.out.println("reopen: skip: return");
+ return false;
+ } else if (!applyDeletes && noDeletesCurrentSearcher.getIndexReader().isCurrent()) {
+ //System.out.println("reopen: skip: isCurrent force gen=" + newSearcherGen + " vs current gen=" + noDeletesSearchingGen);
+ noDeletesSearchingGen.set(newSearcherGen);
+ synchronized(this) {
+ notifyAll();
+ }
+ //System.out.println("reopen: skip: return");
+ return false;
+ }
+
+ //System.out.println("indexingGen now " + indexingGen);
+
+ // .reopen() returns a new reference:
+
+ // Start from whichever searcher is most current:
+ final IndexSearcher startSearcher = noDeletesSearchingGen.get() > searchingGen.get() ? noDeletesCurrentSearcher : currentSearcher;
+ final IndexReader nextReader = startSearcher.getIndexReader().reopen(writer, applyDeletes);
+
+ warm(nextReader);
+
+ // Transfer reference to swapSearcher:
+ swapSearcher(new IndexSearcher(nextReader, es),
+ newSearcherGen,
+ applyDeletes);
+
+ return true;
+ }
+
+ /** Override this to warm the newly opened reader before
+ * it's swapped in. Note that this is called both for
+ * newly merged segments and for new top-level readers
+ * opened by #reopen. */
+ protected void warm(IndexReader reader) throws IOException {
+ }
+
+ // Steals a reference from newSearcher:
+ private synchronized void swapSearcher(IndexSearcher newSearcher, long newSearchingGen, boolean applyDeletes) throws IOException {
+ //System.out.println(Thread.currentThread().getName() + ": swap searcher gen=" + newSearchingGen + " applyDeletes=" + applyDeletes);
+
+ // Always replace noDeletesCurrentSearcher:
+ if (noDeletesCurrentSearcher != null) {
+ noDeletesCurrentSearcher.getIndexReader().decRef();
+ }
+ noDeletesCurrentSearcher = newSearcher;
+ assert newSearchingGen > noDeletesSearchingGen.get(): "newSearchingGen=" + newSearchingGen + " noDeletesSearchingGen=" + noDeletesSearchingGen;
+ noDeletesSearchingGen.set(newSearchingGen);
+
+ if (applyDeletes) {
+ // Deletes were applied, so we also update currentSearcher:
+ if (currentSearcher != null) {
+ currentSearcher.getIndexReader().decRef();
+ }
+ currentSearcher = newSearcher;
+ if (newSearcher != null) {
+ newSearcher.getIndexReader().incRef();
+ }
+ assert newSearchingGen > searchingGen.get(): "newSearchingGen=" + newSearchingGen + " searchingGen=" + searchingGen;
+ searchingGen.set(newSearchingGen);
+ }
+
+ notifyAll();
+ //System.out.println(Thread.currentThread().getName() + ": done");
+ }
+
+ /** NOTE: caller must separately close the writer. */
+ // @Override -- not until Java 1.6
+ public void close() throws IOException {
+ swapSearcher(null, indexingGen.getAndIncrement(), true);
+ }
+}
Index: lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManagerReopenThread.java
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManagerReopenThread.java Thu Jun 09 18:48:18 2011 -0400
@@ -0,0 +1,202 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/**
+ * Utility class that runs a reopen thread to periodically
+ * reopen the NRT searchers in the provided {@link
+ * NRTManager}.
+ *
+ * Typical usage looks like this: + * + *
+ * ... open your own writer ...
+ *
+ * NRTManager manager = new NRTManager(writer);
+ *
+ * // Refreshes searcher every 5 seconds when nobody is waiting, and up to 100 msec delay
+ * // when somebody is waiting:
+ * NRTManagerReopenThread reopenThread = new NRTManagerReopenThread(manager, 5.0, 0.1);
+ * reopenThread.setName("NRT Reopen Thread");
+ * reopenThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
+ * reopenThread.setDaemon(true);
+ * reopenThread.start();
+ *
+ *
+ * Then, for each incoming query, do this:
+ *
+ *
+ * // For each incoming query:
+ * IndexSearcher searcher = manager.get();
+ * try {
+ * // Use searcher to search...
+ * } finally {
+ * manager.release(searcher);
+ * }
+ *
+ *
+ * You should make changes using the NRTManager; if you later need to obtain
+ * a searcher reflecting those changes:
+ *
+ *
+ * // ... or updateDocument, deleteDocuments, etc:
+ * long gen = manager.addDocument(...);
+ *
+ * // Returned searcher is guaranteed to reflect the just added document
+ * IndexSearcher searcher = manager.get(gen);
+ * try {
+ * // Use searcher to search...
+ * } finally {
+ * manager.release(searcher);
+ * }
+ *
+ *
+ *
+ * When you are done be sure to close both the manager and the reopen thrad:
+ * + * reopenThread.close(); + * manager.close(); + *+ */ + +public class NRTManagerReopenThread extends Thread implements NRTManager.WaitingListener, Closeable { + private final NRTManager manager; + private final long targetMaxStaleNS; + private final long targetMinStaleNS; + private boolean finish; + private boolean waitingNeedsDeletes; + private long waitingGen; + + /** + * Create NRTManagerReopenThread, to periodically reopen the NRT searcher. + * + * @param targetMaxStaleSec Maximum time until a new + * reader must be opened; this sets the upper bound + * on how slowly reopens may occur + * + * @param targetMinStaleSec Mininum time until a new + * reader can be opened; this sets the lower bound + * on how quickly reopens may occur, when a caller + * is waiting for a specific indexing change to + * become visible. + */ + + public NRTManagerReopenThread(NRTManager manager, double targetMaxStaleSec, double targetMinStaleSec) { + if (targetMaxStaleSec < targetMinStaleSec) { + throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")"); + } + this.manager = manager; + this.targetMaxStaleNS = (long) (1000000000*targetMaxStaleSec); + this.targetMinStaleNS = (long) (1000000000*targetMinStaleSec); + manager.addWaitingListener(this); + } + + public synchronized void close() { + //System.out.println("NRT: set finish"); + manager.removeWaitingListener(this); + this.finish = true; + notify(); + try { + join(); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + } + + public synchronized void waiting(boolean needsDeletes, long targetGen) { + waitingNeedsDeletes |= needsDeletes; + waitingGen = Math.max(waitingGen, targetGen); + notify(); + //System.out.println(Thread.currentThread().getName() + ": force wakeup waitingGen=" + waitingGen + " applyDeletes=" + applyDeletes + " waitingNeedsDeletes=" + waitingNeedsDeletes); + } + + @Override + public void run() { + // TODO: maybe use private thread ticktock timer, in + // case clock shift messes up nanoTime? + long lastReopenStartNS = System.nanoTime(); + + //System.out.println("reopen: start"); + try { + while (true) { + + final boolean doApplyDeletes; + + boolean hasWaiting = false; + + synchronized(this) { + // TODO: try to guestimate how long reopen might + // take based on past data? + + while (!finish) { + //System.out.println("reopen: cycle"); + + // True if we have someone waiting for reopen'd searcher: + hasWaiting = waitingGen > manager.getCurrentSearchingGen(waitingNeedsDeletes); + final long nextReopenStartNS = lastReopenStartNS + (hasWaiting ? targetMinStaleNS : targetMaxStaleNS); + + final long sleepNS = nextReopenStartNS - System.nanoTime(); + + if (sleepNS > 0) { + //System.out.println("reopen: sleep " + (sleepNS/1000000.0) + " ms (hasWaiting=" + hasWaiting + ")"); + try { + wait(sleepNS/1000000, (int) (sleepNS%1000000)); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + //System.out.println("NRT: set finish on interrupt"); + finish = true; + break; + } + } else { + break; + } + } + + if (finish) { + //System.out.println("reopen: finish"); + return; + } + + doApplyDeletes = hasWaiting ? waitingNeedsDeletes : true; + waitingNeedsDeletes = false; + //System.out.println("reopen: start hasWaiting=" + hasWaiting); + } + + lastReopenStartNS = System.nanoTime(); + try { + //final long t0 = System.nanoTime(); + manager.reopen(doApplyDeletes); + //System.out.println("reopen took " + ((System.nanoTime()-t0)/1000000.0) + " msec"); + } catch (IOException ioe) { + //System.out.println(Thread.currentThread().getName() + ": IOE"); + //ioe.printStackTrace(); + throw new RuntimeException(ioe); + } + } + } catch (Throwable t) { + //System.out.println("REOPEN EXC"); + //t.printStackTrace(System.out); + throw new RuntimeException(t); + } + } +} Index: lucene/contrib/misc/src/test/org/apache/lucene/index/TestNRTManager.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ lucene/contrib/misc/src/test/org/apache/lucene/index/TestNRTManager.java Thu Jun 09 18:48:18 2011 -0400 @@ -0,0 +1,684 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.Fieldable; +import org.apache.lucene.index.codecs.CodecProvider; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.PhraseQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MockDirectoryWrapper; +import org.apache.lucene.store.NRTCachingDirectory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.LineFileDocs; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.NamedThreadFactory; +import org.apache.lucene.util._TestUtil; +import org.junit.Test; + +// TODO +// - mix in optimize, addIndexes +// - randomoly mix in non-congruent docs + +// NOTE: This is a copy of TestNRTThreads, but swapping in +// NRTManager for adding/updating/searching + +public class TestNRTManager extends LuceneTestCase { + + private static class SubDocs { + public final String packID; + public final List