Index: lucene/contrib/misc/src/test/org/apache/lucene/search/TestSearcherManager.java =================================================================== --- lucene/contrib/misc/src/test/org/apache/lucene/search/TestSearcherManager.java (revision 1179425) +++ lucene/contrib/misc/src/test/org/apache/lucene/search/TestSearcherManager.java (working copy) @@ -20,6 +20,8 @@ import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.lucene.analysis.MockAnalyzer; @@ -29,6 +31,7 @@ import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; +import org.apache.lucene.util.NamedThreadFactory; import org.apache.lucene.util._TestUtil; public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase { @@ -41,25 +44,35 @@ @Override protected IndexSearcher getFinalSearcher() throws Exception { - writer.commit(); - mgr.maybeReopen(); + if (!isNRT) { + writer.commit(); + } + assertTrue(mgr.maybeReopen() || mgr.isSearcherCurrent()); return mgr.acquire(); } private SearcherManager mgr; + private boolean isNRT; @Override protected void doAfterWriter(ExecutorService es) throws Exception { // SearcherManager needs to see empty commit: - writer.commit(); - mgr = new SearcherManager(dir, - new SearcherWarmer() { - @Override - public void warm(IndexSearcher s) throws IOException { - TestSearcherManager.this.warmCalled = true; - s.search(new TermQuery(new Term("body", "united")), 10); - } - }, es); + final SearcherWarmer warmer = new SearcherWarmer() { + @Override + public void warm(IndexSearcher s) throws IOException { + TestSearcherManager.this.warmCalled = true; + s.search(new TermQuery(new Term("body", "united")), 10); + } + }; + if (random.nextBoolean()) { + mgr = SearcherManager.open(writer, true, warmer, es); + isNRT = true; + } else { + writer.commit(); + mgr = SearcherManager.open(dir, warmer, es); + isNRT = false; + } + } @Override @@ -126,19 +139,20 @@ writer.commit(); final CountDownLatch awaitEnterWarm = new CountDownLatch(1); final CountDownLatch awaitClose = new CountDownLatch(1); - - final SearcherManager searcherManager = new SearcherManager(dir, - new SearcherWarmer() { - @Override - public void warm(IndexSearcher s) throws IOException { - try { - awaitEnterWarm.countDown(); - awaitClose.await(); - } catch (InterruptedException e) { - // - } - } - }); + final ExecutorService es = random.nextBoolean() ? null : Executors.newCachedThreadPool(new NamedThreadFactory("testIntermediateClose")); + final SearcherWarmer warmer = new SearcherWarmer() { + @Override + public void warm(IndexSearcher s) throws IOException { + try { + awaitEnterWarm.countDown(); + awaitClose.await(); + } catch (InterruptedException e) { + // + } + } + }; + final SearcherManager searcherManager = random.nextBoolean() ? SearcherManager.open(dir, + warmer, es) : SearcherManager.open(writer, random.nextBoolean(), warmer, es); IndexSearcher searcher = searcherManager.acquire(); try { assertEquals(1, searcher.getIndexReader().numDocs()); @@ -185,6 +199,9 @@ assertNull("" + exc[0], exc[0]); writer.close(); dir.close(); - + if (es != null) { + es.shutdown(); + es.awaitTermination(1, TimeUnit.SECONDS); + } } } Index: lucene/contrib/misc/src/test/org/apache/lucene/index/TestNRTManager.java =================================================================== --- lucene/contrib/misc/src/test/org/apache/lucene/index/TestNRTManager.java (revision 1179425) +++ lucene/contrib/misc/src/test/org/apache/lucene/index/TestNRTManager.java (working copy) @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.SearcherManager; import org.apache.lucene.search.SearcherWarmer; import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; @@ -41,7 +42,8 @@ if (VERBOSE) { System.out.println("TEST: finalSearcher maxGen=" + maxGen); } - return nrt.get(maxGen, true); + final SearcherManager manager = nrt.waitForGeneration(maxGen, true); + return manager.acquire(); } @Override @@ -67,14 +69,15 @@ if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); } - final IndexSearcher s = nrt.get(gen, true); + SearcherManager manager = nrt.waitForGeneration(gen, true); + final IndexSearcher s = manager.acquire(); if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); } try { assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits); } finally { - nrt.release(s); + manager.release(s); } } @@ -89,14 +92,15 @@ if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); } - final IndexSearcher s = nrt.get(gen, false); + final SearcherManager manager = nrt.waitForGeneration(gen, false); + final IndexSearcher s = manager.acquire();// nocommit get(gen, false); if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); } try { assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits); } finally { - nrt.release(s); + manager.release(s); } } lastGens.set(gen); @@ -111,14 +115,15 @@ if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); } - final IndexSearcher s = nrt.get(gen, false); + final SearcherManager manager = nrt.waitForGeneration(gen, false); + final IndexSearcher s = manager.acquire(); if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); } try { assertEquals(1, s.search(new TermQuery(id), 10).totalHits); } finally { - nrt.release(s); + manager.release(s); } } lastGens.set(gen); @@ -132,14 +137,15 @@ if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); } - final IndexSearcher s = nrt.get(gen, true); + final SearcherManager manager = nrt.waitForGeneration(gen, true); + final IndexSearcher s = manager.acquire(); if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); } try { assertEquals(1, s.search(new TermQuery(id), 10).totalHits); } finally { - nrt.release(s); + manager.release(s); } } lastGens.set(gen); @@ -153,14 +159,15 @@ if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: verify del " + id); } - final IndexSearcher s = nrt.get(gen, true); + final SearcherManager manager = nrt.waitForGeneration(gen, true); + final IndexSearcher s = manager.acquire(); if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); } try { assertEquals(0, s.search(new TermQuery(id), 10).totalHits); } finally { - nrt.release(s); + manager.release(s); } } lastGens.set(gen); @@ -168,7 +175,6 @@ private NRTManager nrt; private NRTManagerReopenThread nrtThread; - @Override protected void doAfterWriter(ExecutorService es) throws Exception { final double minReopenSec = 0.01 + 0.05 * random.nextDouble(); @@ -185,7 +191,8 @@ TestNRTManager.this.warmCalled = true; s.search(new TermQuery(new Term("body", "united")), 10); } - }); + }, false); + nrtThread = new NRTManagerReopenThread(nrt, maxReopenSec, minReopenSec); nrtThread.setName("NRT Reopen Thread"); nrtThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY)); @@ -214,7 +221,7 @@ @Override protected IndexSearcher getCurrentSearcher() throws Exception { - return nrt.get(random.nextBoolean()); + return nrt.acquireLatest(); } @Override Index: lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherManager.java =================================================================== --- lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherManager.java (revision 1179425) +++ lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherManager.java (working copy) @@ -17,11 +17,11 @@ * limitations under the License. */ -import java.io.Closeable; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.NRTManager; // javadocs @@ -29,119 +29,83 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; -/** Utility class to safely share {@link IndexSearcher} instances - * across multiple threads, while periodically reopening. - * This class ensures each IndexSearcher instance is not - * closed until it is no longer needed. - * - *
Use {@link #acquire} to obtain the current searcher, and - * {@link #release} to release it, like this: - * - *
- * IndexSearcher s = manager.acquire();
- * try {
- * // Do searching, doc retrieval, etc. with s
- * } finally {
- * manager.release(s);
- * }
- * // Do not use s after this!
- * s = null;
- *
- *
- * In addition you should periodically call {@link - * #maybeReopen}. While it's possible to call this just - * before running each query, this is discouraged since it - * penalizes the unlucky queries that do the reopen. It's - * better to use a separate background thread, that - * periodically calls maybeReopen. Finally, be sure to - * call {@link #close} once you are done. - * - *
NOTE: if you have an {@link IndexWriter}, it's - * better to use {@link NRTManager} since that class pulls - * near-real-time readers from the IndexWriter. - * - * @lucene.experimental +/** + * Utility class to safely share {@link IndexSearcher} instances across multiple + * threads, while periodically reopening. This class ensures each searcher is + * closed only once all threads have finished using it. + * + *
+ * Use {@link #acquire} to obtain the current searcher, and {@link #release} to + * release it, like this: + * + *
+ * IndexSearcher s = manager.acquire();
+ * try {
+ * // Do searching, doc retrieval, etc. with s
+ * } finally {
+ * manager.release(s);
+ * }
+ * // Do not use s after this!
+ * s = null;
+ *
+ *
+ * + * In addition you should periodically call {@link #maybeReopen}. While it's + * possible to call this just before running each query, this is discouraged + * since it penalizes the unlucky queries that do the reopen. It's better to use + * a separate background thread, that periodically calls maybeReopen. Finally, + * be sure to call {@link #close} once you are done. + * + *
+ * NOTE: if you have an {@link IndexWriter}, it's better to use + * {@link NRTManager} since that class pulls near-real-time readers from the + * IndexWriter. + * + * @lucene.experimental */ -public class SearcherManager implements Closeable { +public abstract class SearcherManager { - // Current searcher - private volatile IndexSearcher currentSearcher; - private final SearcherWarmer warmer; - private final Semaphore reopening = new Semaphore(1); - private final ExecutorService es; - - /** Opens an initial searcher from the Directory. - * - * @param dir Directory to open the searcher from - * - * @param warmer optional {@link SearcherWarmer}. Pass - * null if you don't require the searcher to warmed - * before going live. - * - *
NOTE: the provided {@link SearcherWarmer} is - * not invoked for the initial searcher; you should - * warm it yourself if necessary. - */ - public SearcherManager(Directory dir, SearcherWarmer warmer) throws IOException { - this(dir, warmer, null); - } - - /** Opens an initial searcher from the Directory. - * - * @param dir Directory to open the searcher from - * - * @param warmer optional {@link SearcherWarmer}. Pass - * null if you don't require the searcher to warmed - * before going live. - * - * @param es optional ExecutorService so different segments can - * be searched concurrently (see {@link - * IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass null - * to search segments sequentially. - * - *
NOTE: the provided {@link SearcherWarmer} is - * not invoked for the initial searcher; you should - * warm it yourself if necessary. - */ - public SearcherManager(Directory dir, SearcherWarmer warmer, ExecutorService es) throws IOException { + protected volatile IndexSearcher currentSearcher; + protected final ExecutorService es; + protected final SearcherWarmer warmer; + protected final Semaphore reopenLock = new Semaphore(1); + + protected SearcherManager(IndexReader openedReader, SearcherWarmer warmer, + ExecutorService es) throws IOException { this.es = es; - currentSearcher = new IndexSearcher(IndexReader.open(dir), this.es); this.warmer = warmer; + currentSearcher = new IndexSearcher(openedReader, es); } - /** You must call this, periodically, to perform a - * reopen. This calls {@link IndexReader#openIfChanged} on the - * underlying reader, and if that returns a new reader, - * it's warmed (if you provided a {@link SearcherWarmer} - * and then swapped into production. - * - *
Threads: it's fine for more than one thread to - * call this at once. Only the first thread will attempt - * the reopen; subsequent threads will see that another - * thread is already handling reopen and will return - * immediately. Note that this means if another thread - * is already reopening then subsequent threads will - * return right away without waiting for the reader - * reopen to complete.
- * - *This method returns true if a new reader was in - * fact opened.
+ /** + * You must call this, periodically, to perform a reopen. This calls + * {@link IndexReader#reopen} on the underlying reader, and if that returns a + * new reader, it's warmed (if you provided a {@link SearcherWarmer} and then + * swapped into production. + * + *+ * Threads: it's fine for more than one thread to call this at once. + * Only the first thread will attempt the reopen; subsequent threads will see + * that another thread is already handling reopen and will return immediately. + * Note that this means if another thread is already reopening then subsequent + * threads will return right away without waiting for the reader reopen to + * complete. + *
+ * + *+ * This method returns true if a new reader was in fact opened. + *
*/ - public boolean maybeReopen() - throws IOException { - - if (currentSearcher == null) { - throw new AlreadyClosedException("this SearcherManager is closed"); - } - + public boolean maybeReopen() throws IOException { + ensureOpen(); // Ensure only 1 thread does reopen at once; other // threads just return immediately: - if (reopening.tryAcquire()) { + if (reopenLock.tryAcquire()) { try { - IndexReader newReader = IndexReader.openIfChanged(currentSearcher.getIndexReader()); + final IndexReader newReader = openIfChanged(currentSearcher.getIndexReader()); if (newReader != null) { - IndexSearcher newSearcher = new IndexSearcher(newReader, es); + final IndexSearcher newSearcher = new IndexSearcher(newReader, es); boolean success = false; try { if (warmer != null) { @@ -159,16 +123,57 @@ return false; } } finally { - reopening.release(); + reopenLock.release(); } } else { return false; } } - /** Obtain the current IndexSearcher. You must match - * every call to acquire with one call to {@link #release}; - * it's best to do so in a finally clause. */ + /** + * Returnstrue if no changes have occured since this searcher
+ * ie. reader was opened, otherwise false.
+ * @see IndexReader#isCurrent()
+ */
+ public boolean isSearcherCurrent() throws CorruptIndexException,
+ IOException {
+ final IndexSearcher searcher = acquire();
+ try {
+ return searcher.getIndexReader().isCurrent();
+ } finally {
+ release(searcher);
+ }
+ }
+
+ /**
+ * Release the searcher previously obtained with {@link #acquire}.
+ *
+ * + * NOTE: it's safe to call this after {@link #close}. + */ + public void release(IndexSearcher searcher) throws IOException { + assert searcher != null; + searcher.getIndexReader().decRef(); + } + + /** + * Close this SearcherManager to future searching. Any searches still in + * process in other threads won't be affected, and they should still call + * {@link #release} after they are done. + */ + public synchronized void close() throws IOException { + if (currentSearcher != null) { + // make sure we can call this more than once + // closeable javadoc says: + // if this is already closed then invoking this method has no effect. + swapSearcher(null); + } + } + + /** + * Obtain the current IndexSearcher. You must match every call to acquire with + * one call to {@link #release}; it's best to do so in a finally clause. + */ public IndexSearcher acquire() { IndexSearcher searcher; do { @@ -177,40 +182,130 @@ } } while (!searcher.getIndexReader().tryIncRef()); return searcher; - } - - /** Release the searcher previously obtained with {@link - * #acquire}. - * - *
NOTE: it's safe to call this after {@link
- * #close}. */
- public void release(IndexSearcher searcher)
- throws IOException {
- searcher.getIndexReader().decRef();
}
- // Replaces old searcher with new one - needs to be synced to make close() work
- private synchronized void swapSearcher(IndexSearcher newSearcher)
- throws IOException {
- IndexSearcher oldSearcher = currentSearcher;
- if (oldSearcher == null) {
+ private void ensureOpen() {
+ if (currentSearcher == null) {
throw new AlreadyClosedException("this SearcherManager is closed");
}
+ }
+
+ protected synchronized void swapSearcher(IndexSearcher newSearcher) throws IOException {
+ ensureOpen();
+ final IndexSearcher oldSearcher = currentSearcher;
currentSearcher = newSearcher;
release(oldSearcher);
}
- /** Close this SearcherManager to future searching. Any
- * searches still in process in other threads won't be
- * affected, and they should still call {@link #release}
- * after they are done. */
- @Override
- public synchronized void close() throws IOException {
- if (currentSearcher != null) {
- // make sure we can call this more than once
- // closeable javadoc says:
- // if this is already closed then invoking this method has no effect.
- swapSearcher(null);
+ protected abstract IndexReader openIfChanged(IndexReader oldReader)
+ throws IOException;
+
+ /**
+ * Creates and returns a new SearcherManager from the given {@link IndexWriter}.
+ * @param writer the IndexWriter to open the IndexReader from.
+ * @param applyAllDeletes If true, all buffered deletes will
+ * be applied (made visible) in the {@link IndexSearcher} / {@link IndexReader}.
+ * If false, the deletes are not applied but remain buffered
+ * (in IndexWriter) so that they will be applied in the future.
+ * Applying deletes can be costly, so if your app can tolerate deleted documents
+ * being returned you might gain some performance by passing false.
+ * @param warmer optional {@link SearcherWarmer}. Pass
+ * null if you don't require the searcher to warmed
+ * before going live. If this is non-null then a
+ * merged segment warmer is installed on the
+ * provided IndexWriter's config.
+ * @param optional ExecutorService so different segments can
+ * be searched concurrently (see {@link
+ * IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass null
+ * to search segments sequentially.
+ *
+ * @see IndexReader#openIfChanged(IndexReader, IndexWriter, boolean)
+ * @throws IOException
+ */
+ public static SearcherManager open(IndexWriter writer, boolean applyAllDeletes,
+ SearcherWarmer warmer, ExecutorService es) throws IOException {
+ final IndexReader open = IndexReader.open(writer, true);
+ boolean success = false;
+ try {
+ SearcherManager manager = new NRTSearcherManager(writer, applyAllDeletes,
+ open, warmer, es);
+ success = true;
+ return manager;
+ } finally {
+ if (!success) {
+ open.close();
+ }
}
}
+
+ /**
+ * Creates and returns a new SearcherManager from the given {@link Directory}.
+ * @param dir the directory to open the IndexReader on.
+ * @param warmer optional {@link SearcherWarmer}. Pass
+ * null if you don't require the searcher to warmed
+ * before going live. If this is non-null then a
+ * merged segment warmer is installed on the
+ * provided IndexWriter's config.
+ * @param optional ExecutorService so different segments can
+ * be searched concurrently (see {@link
+ * IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass null
+ * to search segments sequentially.
+ *
+ * @throws IOException
+ */
+ public static SearcherManager open(Directory dir, SearcherWarmer warmer,
+ ExecutorService es) throws IOException {
+ final IndexReader open = IndexReader.open(dir, true);
+ boolean success = false;
+ try {
+ SearcherManager manager = new DirectorySearchManager(open, warmer, es);
+ success = true;
+ return manager;
+ } finally {
+ if (!success) {
+ open.close();
+ }
+ }
+ }
+
+ static final class NRTSearcherManager extends SearcherManager {
+ private final IndexWriter writer;
+ private final boolean applyDeletes;
+
+ NRTSearcherManager(final IndexWriter writer, final boolean applyDeletes,
+ final IndexReader openedReader, final SearcherWarmer warmer, final ExecutorService es)
+ throws IOException {
+ super(openedReader, warmer, es);
+ this.writer = writer;
+ this.applyDeletes = applyDeletes;
+ if (warmer != null) {
+ writer.getConfig().setMergedSegmentWarmer(
+ new IndexWriter.IndexReaderWarmer() {
+ @Override
+ public void warm(IndexReader reader) throws IOException {
+ warmer.warm(new IndexSearcher(reader, es));
+ }
+ });
+ }
+ }
+
+ @Override
+ protected IndexReader openIfChanged(IndexReader oldReader)
+ throws IOException {
+ return IndexReader.openIfChanged(oldReader, writer, applyDeletes);
+ }
+ }
+
+ static final class DirectorySearchManager extends SearcherManager {
+ DirectorySearchManager(IndexReader openedReader,
+ SearcherWarmer warmer, ExecutorService es) throws IOException {
+ super(openedReader, warmer, es);
+ }
+
+ @Override
+ protected IndexReader openIfChanged(IndexReader oldReader)
+ throws IOException {
+ return IndexReader.openIfChanged(oldReader, true);
+ }
+ }
}
Index: lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManagerReopenThread.java
===================================================================
--- lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManagerReopenThread.java (revision 1179425)
+++ lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManagerReopenThread.java (working copy)
@@ -80,12 +80,13 @@
*/
public class NRTManagerReopenThread extends Thread implements NRTManager.WaitingListener, Closeable {
+
private final NRTManager manager;
private final long targetMaxStaleNS;
private final long targetMinStaleNS;
private boolean finish;
- private boolean waitingNeedsDeletes;
private long waitingGen;
+ private boolean waitingNeedsDeletes;
/**
* Create NRTManagerReopenThread, to periodically reopen the NRT searcher.
@@ -131,7 +132,7 @@
}
@Override
- public void run() {
+ public void run() {
// TODO: maybe use private thread ticktock timer, in
// case clock shift messes up nanoTime?
long lastReopenStartNS = System.nanoTime();
@@ -140,8 +141,6 @@
try {
while (true) {
- final boolean doApplyDeletes;
-
boolean hasWaiting = false;
synchronized(this) {
@@ -176,16 +175,13 @@
//System.out.println("reopen: finish");
return;
}
-
- doApplyDeletes = hasWaiting ? waitingNeedsDeletes : true;
- waitingNeedsDeletes = false;
//System.out.println("reopen: start hasWaiting=" + hasWaiting);
}
lastReopenStartNS = System.nanoTime();
try {
//final long t0 = System.nanoTime();
- manager.reopen(doApplyDeletes);
+ manager.maybeReopen(waitingNeedsDeletes);
//System.out.println("reopen took " + ((System.nanoTime()-t0)/1000000.0) + " msec");
} catch (IOException ioe) {
//System.out.println(Thread.currentThread().getName() + ": IOE");
Index: lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManager.java
===================================================================
--- lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManager.java (revision 1179425)
+++ lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManager.java (working copy)
@@ -22,23 +22,25 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.index.IndexReader; // javadocs
+import org.apache.lucene.index.IndexReader; // javadocs
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
+import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.SearcherWarmer;
+import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
-// TODO
-// - we could make this work also w/ "normal" reopen/commit?
-
/**
* Utility class to manage sharing near-real-time searchers
* across multiple searching threads.
*
- *
NOTE: to use this class, you must call reopen + *
NOTE: to use this class, you must call {@link #maybeReopen(boolean)}
* periodically. The {@link NRTManagerReopenThread} is a
* simple class to do this on a periodic basis. If you
* implement your own reopener, be sure to call {@link
@@ -50,16 +52,13 @@
public class NRTManager implements Closeable {
private final IndexWriter writer;
- private final ExecutorService es;
+ private final SearcherManagerRef withoutDeletes;
+ private final SearcherManagerRef withDeletes;
private final AtomicLong indexingGen;
- private final AtomicLong searchingGen;
- private final AtomicLong noDeletesSearchingGen;
- private final SearcherWarmer warmer;
private final List
+ * NOTE: caller must separately close the writer.
+ */
+ public synchronized void close() throws IOException {
+ reopenLock.lock();
+ try {
+ IOUtils.close(withDeletes, withoutDeletes);
+ newGeneration.signalAll();
+ } finally {
+ reopenLock.unlock();
+ }
}
-
- /** Release the searcher obtained from {@link
- * #get()} or {@link #get(long)}.
- *
- * NOTE: it's safe to call this after {@link
- * #close}. */
- public void release(IndexSearcher s) throws IOException {
- s.getIndexReader().decRef();
+
+ //nocommit javadoc
+ public IndexSearcher acquireLatest() {
+ return getSearcherManager(false).acquire();
}
-
- /** Call this when you need the NRT reader to reopen.
- *
- * @param applyDeletes If true, the newly opened reader
- * will reflect all deletes
- */
- public boolean reopen(boolean applyDeletes) throws IOException {
-
- // Mark gen as of when reopen started:
- final long newSearcherGen = indexingGen.getAndIncrement();
-
- if (applyDeletes && currentSearcher.getIndexReader().isCurrent()) {
- //System.out.println("reopen: skip: isCurrent both force gen=" + newSearcherGen + " vs current gen=" + searchingGen);
- searchingGen.set(newSearcherGen);
- noDeletesSearchingGen.set(newSearcherGen);
- synchronized(this) {
- notifyAll();
+
+ //nocommit javadoc
+ public void release(IndexSearcher searcher) throws IOException {
+ // NOTE: doesn't matter if searcher was with or without
+ // deletes since in both cases this decRefs the IndexReader
+ getSearcherManager(false).release(searcher);
+ }
+
+ //nocommit javadoc
+ public SearcherManager getSearcherManager(boolean applyAllDeletes) {
+ if (applyAllDeletes) {
+ return withDeletes.manager;
+ } else {
+ if (withDeletes.generation > withoutDeletes.generation) {
+ return withDeletes.manager;
+ } else {
+ return withoutDeletes.manager;
}
- //System.out.println("reopen: skip: return");
- return false;
- } else if (!applyDeletes && noDeletesCurrentSearcher.getIndexReader().isCurrent()) {
- //System.out.println("reopen: skip: isCurrent force gen=" + newSearcherGen + " vs current gen=" + noDeletesSearchingGen);
- noDeletesSearchingGen.set(newSearcherGen);
- synchronized(this) {
- notifyAll();
- }
- //System.out.println("reopen: skip: return");
- return false;
}
-
- //System.out.println("indexingGen now " + indexingGen);
-
- // .reopen() returns a new reference:
-
- // Start from whichever searcher is most current:
- final IndexSearcher startSearcher = noDeletesSearchingGen.get() > searchingGen.get() ? noDeletesCurrentSearcher : currentSearcher;
- IndexReader nextReader = IndexReader.openIfChanged(startSearcher.getIndexReader(), writer, applyDeletes);
- if (nextReader == null) {
- // NOTE: doesn't happen currently in Lucene (reopen on
- // NRT reader always returns new reader), but could in
- // the future:
- nextReader = startSearcher.getIndexReader();
- nextReader.incRef();
- }
-
- if (nextReader != startSearcher.getIndexReader()) {
- final IndexSearcher nextSearcher = new IndexSearcher(nextReader, es);
- if (warmer != null) {
- boolean success = false;
- try {
- warmer.warm(nextSearcher);
- success = true;
- } finally {
- if (!success) {
- nextReader.decRef();
- }
- }
- }
-
- // Transfer reference to swapSearcher:
- swapSearcher(nextSearcher,
- newSearcherGen,
- applyDeletes);
- return true;
- } else {
- return false;
- }
}
+
+ static final class SearcherManagerRef implements Closeable {
+ final boolean applyDeletes;
+ volatile long generation;
+ final SearcherManager manager;
- // Steals a reference from newSearcher:
- private synchronized void swapSearcher(IndexSearcher newSearcher, long newSearchingGen, boolean applyDeletes) throws IOException {
- //System.out.println(Thread.currentThread().getName() + ": swap searcher gen=" + newSearchingGen + " applyDeletes=" + applyDeletes);
+ SearcherManagerRef(boolean applyDeletes, long generation, SearcherManager manager) {
+ super();
+ this.applyDeletes = applyDeletes;
+ this.generation = generation;
+ this.manager = manager;
+ }
- // Always replace noDeletesCurrentSearcher:
- if (noDeletesCurrentSearcher != null) {
- noDeletesCurrentSearcher.getIndexReader().decRef();
+ public void close() throws IOException {
+ generation = Long.MAX_VALUE; // max it out to make sure nobody can wait on another gen
+ manager.close();
}
- noDeletesCurrentSearcher = newSearcher;
- assert newSearchingGen > noDeletesSearchingGen.get(): "newSearchingGen=" + newSearchingGen + " noDeletesSearchingGen=" + noDeletesSearchingGen;
- noDeletesSearchingGen.set(newSearchingGen);
-
- if (applyDeletes) {
- // Deletes were applied, so we also update currentSearcher:
- if (currentSearcher != null) {
- currentSearcher.getIndexReader().decRef();
- }
- currentSearcher = newSearcher;
- if (newSearcher != null) {
- newSearcher.getIndexReader().incRef();
- }
- assert newSearchingGen > searchingGen.get(): "newSearchingGen=" + newSearchingGen + " searchingGen=" + searchingGen;
- searchingGen.set(newSearchingGen);
- }
-
- notifyAll();
- //System.out.println(Thread.currentThread().getName() + ": done");
}
-
- /** Close this NRTManager to future searching. Any
- * searches still in process in other threads won't be
- * affected, and they should still call {@link #release}
- * after they are done.
- *
- * NOTE: caller must separately close the writer. */
- @Override
- public void close() throws IOException {
- swapSearcher(null, indexingGen.getAndIncrement(), true);
- }
}
false for
- * requireDeletes, you can get faster reopen time, but
- * the returned reader is allowed to not reflect all
- * deletions. See {@link IndexReader#open(IndexWriter,boolean)} */
- public synchronized IndexSearcher get(boolean requireDeletes) {
- final IndexSearcher s;
- if (requireDeletes) {
- s = currentSearcher;
- } else if (noDeletesSearchingGen.get() > searchingGen.get()) {
- s = noDeletesCurrentSearcher;
+ //nocommit javadoc
+ public SearcherManager waitForGeneration(long targetGen, boolean requireDeletes, long time, TimeUnit unit) {
+ try {
+ reopenLock.lockInterruptibly();
+ try {
+ if (targetGen > getCurrentSearchingGen(requireDeletes)) {
+ for (WaitingListener listener : waitingListeners) {
+ listener.waiting(requireDeletes, targetGen);
+ }
+ while (targetGen > getCurrentSearchingGen(requireDeletes)) {
+ if (!waitOnGenCondition(time, unit)) {
+ return getSearcherManager(requireDeletes);
+ }
+ }
+ }
+
+ } finally {
+ reopenLock.unlock();
+ }
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ }
+ return getSearcherManager(requireDeletes);
+ }
+
+ //nocommit javadoc
+ private boolean waitOnGenCondition(long time, TimeUnit unit)
+ throws InterruptedException {
+ assert reopenLock.isHeldByCurrentThread();
+ if (time < 0) {
+ newGeneration.await();
+ return true;
} else {
- s = currentSearcher;
+ return newGeneration.await(time, unit);
}
- s.getIndexReader().incRef();
- return s;
}
- /** Call this if you require a searcher reflecting all
- * changes as of the target generation.
- *
- * @param targetGen Returned searcher must reflect changes
- * as of this generation
- */
- public synchronized IndexSearcher get(long targetGen) {
- return get(targetGen, true);
+ /** Returns generation of current searcher. */
+ public long getCurrentSearchingGen(boolean applyAllDeletes) {
+ if (applyAllDeletes) {
+ return withDeletes.generation;
+ } else {
+ return Math.max(withoutDeletes.generation, withDeletes.generation);
+ }
}
- /** Call this if you require a searcher reflecting all
- * changes as of the target generation, and you don't
- * require deletions to be reflected. Note that the
- * returned searcher may still reflect some or all
- * deletions.
- *
- * @param targetGen Returned searcher must reflect changes
- * as of this generation
- *
- * @param requireDeletes If true, the returned searcher must
- * reflect all deletions. This can be substantially more
- * costly than not applying deletes. Note that if you
- * pass false, it's still possible that some or all
- * deletes may have been applied.
- **/
- public synchronized IndexSearcher get(long targetGen, boolean requireDeletes) {
-
- assert noDeletesSearchingGen.get() >= searchingGen.get(): "noDeletesSearchingGen=" + noDeletesSearchingGen.get() + " searchingGen=" + searchingGen.get();
-
- if (targetGen > getCurrentSearchingGen(requireDeletes)) {
- // Must wait
- //final long t0 = System.nanoTime();
- for(WaitingListener listener : waitingListeners) {
- listener.waiting(requireDeletes, targetGen);
- }
- while (targetGen > getCurrentSearchingGen(requireDeletes)) {
- //System.out.println(Thread.currentThread().getName() + ": wait fresh searcher targetGen=" + targetGen + " vs searchingGen=" + getCurrentSearchingGen(requireDeletes) + " requireDeletes=" + requireDeletes);
- try {
- wait();
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
+ public boolean maybeReopen(boolean applyAllDeletes) throws IOException {
+ if (reopenLock.tryLock()) {
+ try {
+ final SearcherManagerRef reference = applyAllDeletes ? withDeletes : withoutDeletes;
+ // Mark gen as of when reopen started:
+ final long newSearcherGen = indexingGen.getAndIncrement();
+ boolean setSearchGen = false;
+ if (!(setSearchGen = reference.manager.isSearcherCurrent())) {
+ setSearchGen = reference.manager.maybeReopen();
}
+ if (setSearchGen) {
+ reference.generation = newSearcherGen;// update searcher gen
+ newGeneration.signalAll(); // wake up threads if we have a new generation
+ }
+ return setSearchGen;
+ } finally {
+ reopenLock.unlock();
}
- //final long waitNS = System.nanoTime()-t0;
- //System.out.println(Thread.currentThread().getName() + ": done wait fresh searcher targetGen=" + targetGen + " vs searchingGen=" + getCurrentSearchingGen(requireDeletes) + " requireDeletes=" + requireDeletes + " WAIT msec=" + (waitNS/1000000.0));
}
-
- return get(requireDeletes);
+ return false;
}
- /** Returns generation of current searcher. */
- public long getCurrentSearchingGen(boolean requiresDeletes) {
- return requiresDeletes ? searchingGen.get() : noDeletesSearchingGen.get();
+ /**
+ * Close this NRTManager to future searching. Any searches still in process in
+ * other threads won't be affected, and they should still call
+ * {@link #release} after they are done.
+ *
+ *