Index: lucene/contrib/misc/src/test/org/apache/lucene/search/TestSearcherManager.java =================================================================== --- lucene/contrib/misc/src/test/org/apache/lucene/search/TestSearcherManager.java (revision 1179425) +++ lucene/contrib/misc/src/test/org/apache/lucene/search/TestSearcherManager.java (working copy) @@ -20,6 +20,8 @@ import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.lucene.analysis.MockAnalyzer; @@ -29,6 +31,7 @@ import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; +import org.apache.lucene.util.NamedThreadFactory; import org.apache.lucene.util._TestUtil; public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase { @@ -41,25 +44,35 @@ @Override protected IndexSearcher getFinalSearcher() throws Exception { - writer.commit(); - mgr.maybeReopen(); + if (!isNRT) { + writer.commit(); + } + assertTrue(mgr.maybeReopen() || mgr.isSearcherCurrent()); return mgr.acquire(); } private SearcherManager mgr; + private boolean isNRT; @Override protected void doAfterWriter(ExecutorService es) throws Exception { // SearcherManager needs to see empty commit: - writer.commit(); - mgr = new SearcherManager(dir, - new SearcherWarmer() { - @Override - public void warm(IndexSearcher s) throws IOException { - TestSearcherManager.this.warmCalled = true; - s.search(new TermQuery(new Term("body", "united")), 10); - } - }, es); + final SearcherWarmer warmer = new SearcherWarmer() { + @Override + public void warm(IndexSearcher s) throws IOException { + TestSearcherManager.this.warmCalled = true; + s.search(new TermQuery(new Term("body", "united")), 10); + } + }; + if (random.nextBoolean()) { + mgr = SearcherManager.open(writer, true, warmer, es); + isNRT = true; + } else { + writer.commit(); + mgr = SearcherManager.open(dir, warmer, es); + isNRT = false; + } + } @Override @@ -126,19 +139,20 @@ writer.commit(); final CountDownLatch awaitEnterWarm = new CountDownLatch(1); final CountDownLatch awaitClose = new CountDownLatch(1); - - final SearcherManager searcherManager = new SearcherManager(dir, - new SearcherWarmer() { - @Override - public void warm(IndexSearcher s) throws IOException { - try { - awaitEnterWarm.countDown(); - awaitClose.await(); - } catch (InterruptedException e) { - // - } - } - }); + final ExecutorService es = random.nextBoolean() ? null : Executors.newCachedThreadPool(new NamedThreadFactory("testIntermediateClose")); + final SearcherWarmer warmer = new SearcherWarmer() { + @Override + public void warm(IndexSearcher s) throws IOException { + try { + awaitEnterWarm.countDown(); + awaitClose.await(); + } catch (InterruptedException e) { + // + } + } + }; + final SearcherManager searcherManager = random.nextBoolean() ? SearcherManager.open(dir, + warmer, es) : SearcherManager.open(writer, random.nextBoolean(), warmer, es); IndexSearcher searcher = searcherManager.acquire(); try { assertEquals(1, searcher.getIndexReader().numDocs()); @@ -185,6 +199,9 @@ assertNull("" + exc[0], exc[0]); writer.close(); dir.close(); - + if (es != null) { + es.shutdown(); + es.awaitTermination(1, TimeUnit.SECONDS); + } } } Index: lucene/contrib/misc/src/test/org/apache/lucene/index/TestNRTManager.java =================================================================== --- lucene/contrib/misc/src/test/org/apache/lucene/index/TestNRTManager.java (revision 1179425) +++ lucene/contrib/misc/src/test/org/apache/lucene/index/TestNRTManager.java (working copy) @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.SearcherManager; import org.apache.lucene.search.SearcherWarmer; import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; @@ -41,7 +42,8 @@ if (VERBOSE) { System.out.println("TEST: finalSearcher maxGen=" + maxGen); } - return nrt.get(maxGen, true); + final SearcherManager manager = nrt.waitForGeneration(maxGen, true); + return manager.acquire(); } @Override @@ -67,14 +69,15 @@ if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); } - final IndexSearcher s = nrt.get(gen, true); + SearcherManager manager = nrt.waitForGeneration(gen, true); + final IndexSearcher s = manager.acquire(); if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); } try { assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits); } finally { - nrt.release(s); + manager.release(s); } } @@ -89,14 +92,15 @@ if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); } - final IndexSearcher s = nrt.get(gen, false); + final SearcherManager manager = nrt.waitForGeneration(gen, false); + final IndexSearcher s = manager.acquire();// nocommit get(gen, false); if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); } try { assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits); } finally { - nrt.release(s); + manager.release(s); } } lastGens.set(gen); @@ -111,14 +115,15 @@ if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); } - final IndexSearcher s = nrt.get(gen, false); + final SearcherManager manager = nrt.waitForGeneration(gen, false); + final IndexSearcher s = manager.acquire(); if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); } try { assertEquals(1, s.search(new TermQuery(id), 10).totalHits); } finally { - nrt.release(s); + manager.release(s); } } lastGens.set(gen); @@ -132,14 +137,15 @@ if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); } - final IndexSearcher s = nrt.get(gen, true); + final SearcherManager manager = nrt.waitForGeneration(gen, true); + final IndexSearcher s = manager.acquire(); if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); } try { assertEquals(1, s.search(new TermQuery(id), 10).totalHits); } finally { - nrt.release(s); + manager.release(s); } } lastGens.set(gen); @@ -153,14 +159,15 @@ if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: verify del " + id); } - final IndexSearcher s = nrt.get(gen, true); + final SearcherManager manager = nrt.waitForGeneration(gen, true); + final IndexSearcher s = manager.acquire(); if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); } try { assertEquals(0, s.search(new TermQuery(id), 10).totalHits); } finally { - nrt.release(s); + manager.release(s); } } lastGens.set(gen); @@ -168,7 +175,6 @@ private NRTManager nrt; private NRTManagerReopenThread nrtThread; - @Override protected void doAfterWriter(ExecutorService es) throws Exception { final double minReopenSec = 0.01 + 0.05 * random.nextDouble(); @@ -185,7 +191,8 @@ TestNRTManager.this.warmCalled = true; s.search(new TermQuery(new Term("body", "united")), 10); } - }); + }, false); + nrtThread = new NRTManagerReopenThread(nrt, maxReopenSec, minReopenSec); nrtThread.setName("NRT Reopen Thread"); nrtThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY)); @@ -214,7 +221,7 @@ @Override protected IndexSearcher getCurrentSearcher() throws Exception { - return nrt.get(random.nextBoolean()); + return nrt.acquireLatest(); } @Override Index: lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherManager.java =================================================================== --- lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherManager.java (revision 1179425) +++ lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherManager.java (working copy) @@ -17,11 +17,11 @@ * limitations under the License. */ -import java.io.Closeable; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.NRTManager; // javadocs @@ -29,119 +29,83 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; -/** Utility class to safely share {@link IndexSearcher} instances - * across multiple threads, while periodically reopening. - * This class ensures each IndexSearcher instance is not - * closed until it is no longer needed. - * - *

Use {@link #acquire} to obtain the current searcher, and - * {@link #release} to release it, like this: - * - *

- *    IndexSearcher s = manager.acquire();
- *    try {
- *      // Do searching, doc retrieval, etc. with s
- *    } finally {
- *      manager.release(s);
- *    }
- *    // Do not use s after this!
- *    s = null;
- *  
- * - *

In addition you should periodically call {@link - * #maybeReopen}. While it's possible to call this just - * before running each query, this is discouraged since it - * penalizes the unlucky queries that do the reopen. It's - * better to use a separate background thread, that - * periodically calls maybeReopen. Finally, be sure to - * call {@link #close} once you are done. - * - *

NOTE: if you have an {@link IndexWriter}, it's - * better to use {@link NRTManager} since that class pulls - * near-real-time readers from the IndexWriter. - * - * @lucene.experimental +/** + * Utility class to safely share {@link IndexSearcher} instances across multiple + * threads, while periodically reopening. This class ensures each searcher is + * closed only once all threads have finished using it. + * + *

+ * Use {@link #acquire} to obtain the current searcher, and {@link #release} to + * release it, like this: + * + *

+ * IndexSearcher s = manager.acquire();
+ * try {
+ *   // Do searching, doc retrieval, etc. with s
+ * } finally {
+ *   manager.release(s);
+ * }
+ * // Do not use s after this!
+ * s = null;
+ * 
+ * + *

+ * In addition you should periodically call {@link #maybeReopen}. While it's + * possible to call this just before running each query, this is discouraged + * since it penalizes the unlucky queries that do the reopen. It's better to use + * a separate background thread, that periodically calls maybeReopen. Finally, + * be sure to call {@link #close} once you are done. + * + *

+ * NOTE: if you have an {@link IndexWriter}, it's better to use + * {@link NRTManager} since that class pulls near-real-time readers from the + * IndexWriter. + * + * @lucene.experimental */ -public class SearcherManager implements Closeable { +public abstract class SearcherManager { - // Current searcher - private volatile IndexSearcher currentSearcher; - private final SearcherWarmer warmer; - private final Semaphore reopening = new Semaphore(1); - private final ExecutorService es; - - /** Opens an initial searcher from the Directory. - * - * @param dir Directory to open the searcher from - * - * @param warmer optional {@link SearcherWarmer}. Pass - * null if you don't require the searcher to warmed - * before going live. - * - *

NOTE: the provided {@link SearcherWarmer} is - * not invoked for the initial searcher; you should - * warm it yourself if necessary. - */ - public SearcherManager(Directory dir, SearcherWarmer warmer) throws IOException { - this(dir, warmer, null); - } - - /** Opens an initial searcher from the Directory. - * - * @param dir Directory to open the searcher from - * - * @param warmer optional {@link SearcherWarmer}. Pass - * null if you don't require the searcher to warmed - * before going live. - * - * @param es optional ExecutorService so different segments can - * be searched concurrently (see {@link - * IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass null - * to search segments sequentially. - * - *

NOTE: the provided {@link SearcherWarmer} is - * not invoked for the initial searcher; you should - * warm it yourself if necessary. - */ - public SearcherManager(Directory dir, SearcherWarmer warmer, ExecutorService es) throws IOException { + protected volatile IndexSearcher currentSearcher; + protected final ExecutorService es; + protected final SearcherWarmer warmer; + protected final Semaphore reopenLock = new Semaphore(1); + + protected SearcherManager(IndexReader openedReader, SearcherWarmer warmer, + ExecutorService es) throws IOException { this.es = es; - currentSearcher = new IndexSearcher(IndexReader.open(dir), this.es); this.warmer = warmer; + currentSearcher = new IndexSearcher(openedReader, es); } - /** You must call this, periodically, to perform a - * reopen. This calls {@link IndexReader#openIfChanged} on the - * underlying reader, and if that returns a new reader, - * it's warmed (if you provided a {@link SearcherWarmer} - * and then swapped into production. - * - *

Threads: it's fine for more than one thread to - * call this at once. Only the first thread will attempt - * the reopen; subsequent threads will see that another - * thread is already handling reopen and will return - * immediately. Note that this means if another thread - * is already reopening then subsequent threads will - * return right away without waiting for the reader - * reopen to complete.

- * - *

This method returns true if a new reader was in - * fact opened.

+ /** + * You must call this, periodically, to perform a reopen. This calls + * {@link IndexReader#reopen} on the underlying reader, and if that returns a + * new reader, it's warmed (if you provided a {@link SearcherWarmer} and then + * swapped into production. + * + *

+ * Threads: it's fine for more than one thread to call this at once. + * Only the first thread will attempt the reopen; subsequent threads will see + * that another thread is already handling reopen and will return immediately. + * Note that this means if another thread is already reopening then subsequent + * threads will return right away without waiting for the reader reopen to + * complete. + *

+ * + *

+ * This method returns true if a new reader was in fact opened. + *

*/ - public boolean maybeReopen() - throws IOException { - - if (currentSearcher == null) { - throw new AlreadyClosedException("this SearcherManager is closed"); - } - + public boolean maybeReopen() throws IOException { + ensureOpen(); // Ensure only 1 thread does reopen at once; other // threads just return immediately: - if (reopening.tryAcquire()) { + if (reopenLock.tryAcquire()) { try { - IndexReader newReader = IndexReader.openIfChanged(currentSearcher.getIndexReader()); + final IndexReader newReader = openIfChanged(currentSearcher.getIndexReader()); if (newReader != null) { - IndexSearcher newSearcher = new IndexSearcher(newReader, es); + final IndexSearcher newSearcher = new IndexSearcher(newReader, es); boolean success = false; try { if (warmer != null) { @@ -159,16 +123,57 @@ return false; } } finally { - reopening.release(); + reopenLock.release(); } } else { return false; } } - /** Obtain the current IndexSearcher. You must match - * every call to acquire with one call to {@link #release}; - * it's best to do so in a finally clause. */ + /** + * Returns true if no changes have occured since this searcher + * ie. reader was opened, otherwise false. + * @see IndexReader#isCurrent() + */ + public boolean isSearcherCurrent() throws CorruptIndexException, + IOException { + final IndexSearcher searcher = acquire(); + try { + return searcher.getIndexReader().isCurrent(); + } finally { + release(searcher); + } + } + + /** + * Release the searcher previously obtained with {@link #acquire}. + * + *

+ * NOTE: it's safe to call this after {@link #close}. + */ + public void release(IndexSearcher searcher) throws IOException { + assert searcher != null; + searcher.getIndexReader().decRef(); + } + + /** + * Close this SearcherManager to future searching. Any searches still in + * process in other threads won't be affected, and they should still call + * {@link #release} after they are done. + */ + public synchronized void close() throws IOException { + if (currentSearcher != null) { + // make sure we can call this more than once + // closeable javadoc says: + // if this is already closed then invoking this method has no effect. + swapSearcher(null); + } + } + + /** + * Obtain the current IndexSearcher. You must match every call to acquire with + * one call to {@link #release}; it's best to do so in a finally clause. + */ public IndexSearcher acquire() { IndexSearcher searcher; do { @@ -177,40 +182,130 @@ } } while (!searcher.getIndexReader().tryIncRef()); return searcher; - } - - /** Release the searcher previously obtained with {@link - * #acquire}. - * - *

NOTE: it's safe to call this after {@link - * #close}. */ - public void release(IndexSearcher searcher) - throws IOException { - searcher.getIndexReader().decRef(); } - // Replaces old searcher with new one - needs to be synced to make close() work - private synchronized void swapSearcher(IndexSearcher newSearcher) - throws IOException { - IndexSearcher oldSearcher = currentSearcher; - if (oldSearcher == null) { + private void ensureOpen() { + if (currentSearcher == null) { throw new AlreadyClosedException("this SearcherManager is closed"); } + } + + protected synchronized void swapSearcher(IndexSearcher newSearcher) throws IOException { + ensureOpen(); + final IndexSearcher oldSearcher = currentSearcher; currentSearcher = newSearcher; release(oldSearcher); } - /** Close this SearcherManager to future searching. Any - * searches still in process in other threads won't be - * affected, and they should still call {@link #release} - * after they are done. */ - @Override - public synchronized void close() throws IOException { - if (currentSearcher != null) { - // make sure we can call this more than once - // closeable javadoc says: - // if this is already closed then invoking this method has no effect. - swapSearcher(null); + protected abstract IndexReader openIfChanged(IndexReader oldReader) + throws IOException; + + /** + * Creates and returns a new SearcherManager from the given {@link IndexWriter}. + * @param writer the IndexWriter to open the IndexReader from. + * @param applyAllDeletes If true, all buffered deletes will + * be applied (made visible) in the {@link IndexSearcher} / {@link IndexReader}. + * If false, the deletes are not applied but remain buffered + * (in IndexWriter) so that they will be applied in the future. + * Applying deletes can be costly, so if your app can tolerate deleted documents + * being returned you might gain some performance by passing false. + * @param warmer optional {@link SearcherWarmer}. Pass + * null if you don't require the searcher to warmed + * before going live. If this is non-null then a + * merged segment warmer is installed on the + * provided IndexWriter's config. + * @param optional ExecutorService so different segments can + * be searched concurrently (see {@link + * IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass null + * to search segments sequentially. + * + * @see IndexReader#openIfChanged(IndexReader, IndexWriter, boolean) + * @throws IOException + */ + public static SearcherManager open(IndexWriter writer, boolean applyAllDeletes, + SearcherWarmer warmer, ExecutorService es) throws IOException { + final IndexReader open = IndexReader.open(writer, true); + boolean success = false; + try { + SearcherManager manager = new NRTSearcherManager(writer, applyAllDeletes, + open, warmer, es); + success = true; + return manager; + } finally { + if (!success) { + open.close(); + } } } + + /** + * Creates and returns a new SearcherManager from the given {@link Directory}. + * @param dir the directory to open the IndexReader on. + * @param warmer optional {@link SearcherWarmer}. Pass + * null if you don't require the searcher to warmed + * before going live. If this is non-null then a + * merged segment warmer is installed on the + * provided IndexWriter's config. + * @param optional ExecutorService so different segments can + * be searched concurrently (see {@link + * IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass null + * to search segments sequentially. + * + * @throws IOException + */ + public static SearcherManager open(Directory dir, SearcherWarmer warmer, + ExecutorService es) throws IOException { + final IndexReader open = IndexReader.open(dir, true); + boolean success = false; + try { + SearcherManager manager = new DirectorySearchManager(open, warmer, es); + success = true; + return manager; + } finally { + if (!success) { + open.close(); + } + } + } + + static final class NRTSearcherManager extends SearcherManager { + private final IndexWriter writer; + private final boolean applyDeletes; + + NRTSearcherManager(final IndexWriter writer, final boolean applyDeletes, + final IndexReader openedReader, final SearcherWarmer warmer, final ExecutorService es) + throws IOException { + super(openedReader, warmer, es); + this.writer = writer; + this.applyDeletes = applyDeletes; + if (warmer != null) { + writer.getConfig().setMergedSegmentWarmer( + new IndexWriter.IndexReaderWarmer() { + @Override + public void warm(IndexReader reader) throws IOException { + warmer.warm(new IndexSearcher(reader, es)); + } + }); + } + } + + @Override + protected IndexReader openIfChanged(IndexReader oldReader) + throws IOException { + return IndexReader.openIfChanged(oldReader, writer, applyDeletes); + } + } + + static final class DirectorySearchManager extends SearcherManager { + DirectorySearchManager(IndexReader openedReader, + SearcherWarmer warmer, ExecutorService es) throws IOException { + super(openedReader, warmer, es); + } + + @Override + protected IndexReader openIfChanged(IndexReader oldReader) + throws IOException { + return IndexReader.openIfChanged(oldReader, true); + } + } } Index: lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManagerReopenThread.java =================================================================== --- lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManagerReopenThread.java (revision 1179425) +++ lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManagerReopenThread.java (working copy) @@ -80,12 +80,13 @@ */ public class NRTManagerReopenThread extends Thread implements NRTManager.WaitingListener, Closeable { + private final NRTManager manager; private final long targetMaxStaleNS; private final long targetMinStaleNS; private boolean finish; - private boolean waitingNeedsDeletes; private long waitingGen; + private boolean waitingNeedsDeletes; /** * Create NRTManagerReopenThread, to periodically reopen the NRT searcher. @@ -131,7 +132,7 @@ } @Override - public void run() { + public void run() { // TODO: maybe use private thread ticktock timer, in // case clock shift messes up nanoTime? long lastReopenStartNS = System.nanoTime(); @@ -140,8 +141,6 @@ try { while (true) { - final boolean doApplyDeletes; - boolean hasWaiting = false; synchronized(this) { @@ -176,16 +175,13 @@ //System.out.println("reopen: finish"); return; } - - doApplyDeletes = hasWaiting ? waitingNeedsDeletes : true; - waitingNeedsDeletes = false; //System.out.println("reopen: start hasWaiting=" + hasWaiting); } lastReopenStartNS = System.nanoTime(); try { //final long t0 = System.nanoTime(); - manager.reopen(doApplyDeletes); + manager.maybeReopen(waitingNeedsDeletes); //System.out.println("reopen took " + ((System.nanoTime()-t0)/1000000.0) + " msec"); } catch (IOException ioe) { //System.out.println(Thread.currentThread().getName() + ": IOE"); Index: lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManager.java =================================================================== --- lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManager.java (revision 1179425) +++ lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManager.java (working copy) @@ -22,23 +22,25 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.index.IndexReader; // javadocs +import org.apache.lucene.index.IndexReader; // javadocs import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; +import org.apache.lucene.search.SearcherManager; import org.apache.lucene.search.SearcherWarmer; +import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.ThreadInterruptedException; -// TODO -// - we could make this work also w/ "normal" reopen/commit? - /** * Utility class to manage sharing near-real-time searchers * across multiple searching threads. * - *

NOTE: to use this class, you must call reopen + *

NOTE: to use this class, you must call {@link #maybeReopen(boolean)} * periodically. The {@link NRTManagerReopenThread} is a * simple class to do this on a periodic basis. If you * implement your own reopener, be sure to call {@link @@ -50,16 +52,13 @@ public class NRTManager implements Closeable { private final IndexWriter writer; - private final ExecutorService es; + private final SearcherManagerRef withoutDeletes; + private final SearcherManagerRef withDeletes; private final AtomicLong indexingGen; - private final AtomicLong searchingGen; - private final AtomicLong noDeletesSearchingGen; - private final SearcherWarmer warmer; private final List waitingListeners = new CopyOnWriteArrayList(); + private final ReentrantLock reopenLock = new ReentrantLock(); + private final Condition newGeneration = reopenLock.newCondition(); - private volatile IndexSearcher currentSearcher; - private volatile IndexSearcher noDeletesCurrentSearcher; - /** * Create new NRTManager. * @@ -79,29 +78,30 @@ * not invoked for the initial searcher; you should * warm it yourself if necessary. */ - public NRTManager(IndexWriter writer, ExecutorService es, SearcherWarmer warmer) throws IOException { + public NRTManager(IndexWriter writer, ExecutorService es, + SearcherWarmer warmer) throws IOException { + this(writer, es, warmer, true); + } + /** + * Expert: just like {@link + * NRTManager(IndexWriter,ExecutorService,SearcherWarmer), + * but you can also specify whether every searcher must + * apply deletes. This is useful for cases where certain + * uses can tolerate seeing some deleted docs, since + * reopen time is faster if deletes need not be applied. */ + public NRTManager(IndexWriter writer, ExecutorService es, + SearcherWarmer warmer, boolean alwaysApplyDeletes) throws IOException { this.writer = writer; - this.es = es; - this.warmer = warmer; - indexingGen = new AtomicLong(1); - searchingGen = new AtomicLong(-1); - noDeletesSearchingGen = new AtomicLong(-1); - - // Create initial reader: - swapSearcher(new IndexSearcher(IndexReader.open(writer, true), es), 0, true); - - if (this.warmer != null) { - writer.getConfig().setMergedSegmentWarmer( - new IndexWriter.IndexReaderWarmer() { - @Override - public void warm(IndexReader reader) throws IOException { - NRTManager.this.warmer.warm(new IndexSearcher(reader, NRTManager.this.es)); - } - }); + if (alwaysApplyDeletes) { + withoutDeletes = withDeletes = new SearcherManagerRef(true, 0, SearcherManager.open(writer, true, warmer, es)); + } else { + withDeletes = new SearcherManagerRef(true, 0, SearcherManager.open(writer, true, warmer, es)); + withoutDeletes = new SearcherManagerRef(false, 0, SearcherManager.open(writer, false, warmer, es)); } + indexingGen = new AtomicLong(1); } - + /** NRTManager invokes this interface to notify it when a * caller is waiting for a specific generation searcher * to be visible. */ @@ -181,202 +181,139 @@ // Return gen as of when indexing finished: return indexingGen.get(); } - - /** Returns the most current searcher. If you require a - * certain indexing generation be visible in the returned - * searcher, call {@link #get(long)} - * instead. - */ - public synchronized IndexSearcher get() { - return get(true); + + + //nocommit javadoc + public SearcherManager waitForGeneration(long targetGen, boolean requireDeletes) { + return waitForGeneration(targetGen, requireDeletes, -1, TimeUnit.NANOSECONDS); } - /** Just like {@link #get}, but by passing false for - * requireDeletes, you can get faster reopen time, but - * the returned reader is allowed to not reflect all - * deletions. See {@link IndexReader#open(IndexWriter,boolean)} */ - public synchronized IndexSearcher get(boolean requireDeletes) { - final IndexSearcher s; - if (requireDeletes) { - s = currentSearcher; - } else if (noDeletesSearchingGen.get() > searchingGen.get()) { - s = noDeletesCurrentSearcher; + //nocommit javadoc + public SearcherManager waitForGeneration(long targetGen, boolean requireDeletes, long time, TimeUnit unit) { + try { + reopenLock.lockInterruptibly(); + try { + if (targetGen > getCurrentSearchingGen(requireDeletes)) { + for (WaitingListener listener : waitingListeners) { + listener.waiting(requireDeletes, targetGen); + } + while (targetGen > getCurrentSearchingGen(requireDeletes)) { + if (!waitOnGenCondition(time, unit)) { + return getSearcherManager(requireDeletes); + } + } + } + + } finally { + reopenLock.unlock(); + } + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + return getSearcherManager(requireDeletes); + } + + //nocommit javadoc + private boolean waitOnGenCondition(long time, TimeUnit unit) + throws InterruptedException { + assert reopenLock.isHeldByCurrentThread(); + if (time < 0) { + newGeneration.await(); + return true; } else { - s = currentSearcher; + return newGeneration.await(time, unit); } - s.getIndexReader().incRef(); - return s; } - /** Call this if you require a searcher reflecting all - * changes as of the target generation. - * - * @param targetGen Returned searcher must reflect changes - * as of this generation - */ - public synchronized IndexSearcher get(long targetGen) { - return get(targetGen, true); + /** Returns generation of current searcher. */ + public long getCurrentSearchingGen(boolean applyAllDeletes) { + if (applyAllDeletes) { + return withDeletes.generation; + } else { + return Math.max(withoutDeletes.generation, withDeletes.generation); + } } - /** Call this if you require a searcher reflecting all - * changes as of the target generation, and you don't - * require deletions to be reflected. Note that the - * returned searcher may still reflect some or all - * deletions. - * - * @param targetGen Returned searcher must reflect changes - * as of this generation - * - * @param requireDeletes If true, the returned searcher must - * reflect all deletions. This can be substantially more - * costly than not applying deletes. Note that if you - * pass false, it's still possible that some or all - * deletes may have been applied. - **/ - public synchronized IndexSearcher get(long targetGen, boolean requireDeletes) { - - assert noDeletesSearchingGen.get() >= searchingGen.get(): "noDeletesSearchingGen=" + noDeletesSearchingGen.get() + " searchingGen=" + searchingGen.get(); - - if (targetGen > getCurrentSearchingGen(requireDeletes)) { - // Must wait - //final long t0 = System.nanoTime(); - for(WaitingListener listener : waitingListeners) { - listener.waiting(requireDeletes, targetGen); - } - while (targetGen > getCurrentSearchingGen(requireDeletes)) { - //System.out.println(Thread.currentThread().getName() + ": wait fresh searcher targetGen=" + targetGen + " vs searchingGen=" + getCurrentSearchingGen(requireDeletes) + " requireDeletes=" + requireDeletes); - try { - wait(); - } catch (InterruptedException ie) { - throw new ThreadInterruptedException(ie); + public boolean maybeReopen(boolean applyAllDeletes) throws IOException { + if (reopenLock.tryLock()) { + try { + final SearcherManagerRef reference = applyAllDeletes ? withDeletes : withoutDeletes; + // Mark gen as of when reopen started: + final long newSearcherGen = indexingGen.getAndIncrement(); + boolean setSearchGen = false; + if (!(setSearchGen = reference.manager.isSearcherCurrent())) { + setSearchGen = reference.manager.maybeReopen(); } + if (setSearchGen) { + reference.generation = newSearcherGen;// update searcher gen + newGeneration.signalAll(); // wake up threads if we have a new generation + } + return setSearchGen; + } finally { + reopenLock.unlock(); } - //final long waitNS = System.nanoTime()-t0; - //System.out.println(Thread.currentThread().getName() + ": done wait fresh searcher targetGen=" + targetGen + " vs searchingGen=" + getCurrentSearchingGen(requireDeletes) + " requireDeletes=" + requireDeletes + " WAIT msec=" + (waitNS/1000000.0)); } - - return get(requireDeletes); + return false; } - /** Returns generation of current searcher. */ - public long getCurrentSearchingGen(boolean requiresDeletes) { - return requiresDeletes ? searchingGen.get() : noDeletesSearchingGen.get(); + /** + * Close this NRTManager to future searching. Any searches still in process in + * other threads won't be affected, and they should still call + * {@link #release} after they are done. + * + *

+ * NOTE: caller must separately close the writer. + */ + public synchronized void close() throws IOException { + reopenLock.lock(); + try { + IOUtils.close(withDeletes, withoutDeletes); + newGeneration.signalAll(); + } finally { + reopenLock.unlock(); + } } - - /** Release the searcher obtained from {@link - * #get()} or {@link #get(long)}. - * - *

NOTE: it's safe to call this after {@link - * #close}. */ - public void release(IndexSearcher s) throws IOException { - s.getIndexReader().decRef(); + + //nocommit javadoc + public IndexSearcher acquireLatest() { + return getSearcherManager(false).acquire(); } - - /** Call this when you need the NRT reader to reopen. - * - * @param applyDeletes If true, the newly opened reader - * will reflect all deletes - */ - public boolean reopen(boolean applyDeletes) throws IOException { - - // Mark gen as of when reopen started: - final long newSearcherGen = indexingGen.getAndIncrement(); - - if (applyDeletes && currentSearcher.getIndexReader().isCurrent()) { - //System.out.println("reopen: skip: isCurrent both force gen=" + newSearcherGen + " vs current gen=" + searchingGen); - searchingGen.set(newSearcherGen); - noDeletesSearchingGen.set(newSearcherGen); - synchronized(this) { - notifyAll(); + + //nocommit javadoc + public void release(IndexSearcher searcher) throws IOException { + // NOTE: doesn't matter if searcher was with or without + // deletes since in both cases this decRefs the IndexReader + getSearcherManager(false).release(searcher); + } + + //nocommit javadoc + public SearcherManager getSearcherManager(boolean applyAllDeletes) { + if (applyAllDeletes) { + return withDeletes.manager; + } else { + if (withDeletes.generation > withoutDeletes.generation) { + return withDeletes.manager; + } else { + return withoutDeletes.manager; } - //System.out.println("reopen: skip: return"); - return false; - } else if (!applyDeletes && noDeletesCurrentSearcher.getIndexReader().isCurrent()) { - //System.out.println("reopen: skip: isCurrent force gen=" + newSearcherGen + " vs current gen=" + noDeletesSearchingGen); - noDeletesSearchingGen.set(newSearcherGen); - synchronized(this) { - notifyAll(); - } - //System.out.println("reopen: skip: return"); - return false; } - - //System.out.println("indexingGen now " + indexingGen); - - // .reopen() returns a new reference: - - // Start from whichever searcher is most current: - final IndexSearcher startSearcher = noDeletesSearchingGen.get() > searchingGen.get() ? noDeletesCurrentSearcher : currentSearcher; - IndexReader nextReader = IndexReader.openIfChanged(startSearcher.getIndexReader(), writer, applyDeletes); - if (nextReader == null) { - // NOTE: doesn't happen currently in Lucene (reopen on - // NRT reader always returns new reader), but could in - // the future: - nextReader = startSearcher.getIndexReader(); - nextReader.incRef(); - } - - if (nextReader != startSearcher.getIndexReader()) { - final IndexSearcher nextSearcher = new IndexSearcher(nextReader, es); - if (warmer != null) { - boolean success = false; - try { - warmer.warm(nextSearcher); - success = true; - } finally { - if (!success) { - nextReader.decRef(); - } - } - } - - // Transfer reference to swapSearcher: - swapSearcher(nextSearcher, - newSearcherGen, - applyDeletes); - return true; - } else { - return false; - } } + + static final class SearcherManagerRef implements Closeable { + final boolean applyDeletes; + volatile long generation; + final SearcherManager manager; - // Steals a reference from newSearcher: - private synchronized void swapSearcher(IndexSearcher newSearcher, long newSearchingGen, boolean applyDeletes) throws IOException { - //System.out.println(Thread.currentThread().getName() + ": swap searcher gen=" + newSearchingGen + " applyDeletes=" + applyDeletes); + SearcherManagerRef(boolean applyDeletes, long generation, SearcherManager manager) { + super(); + this.applyDeletes = applyDeletes; + this.generation = generation; + this.manager = manager; + } - // Always replace noDeletesCurrentSearcher: - if (noDeletesCurrentSearcher != null) { - noDeletesCurrentSearcher.getIndexReader().decRef(); + public void close() throws IOException { + generation = Long.MAX_VALUE; // max it out to make sure nobody can wait on another gen + manager.close(); } - noDeletesCurrentSearcher = newSearcher; - assert newSearchingGen > noDeletesSearchingGen.get(): "newSearchingGen=" + newSearchingGen + " noDeletesSearchingGen=" + noDeletesSearchingGen; - noDeletesSearchingGen.set(newSearchingGen); - - if (applyDeletes) { - // Deletes were applied, so we also update currentSearcher: - if (currentSearcher != null) { - currentSearcher.getIndexReader().decRef(); - } - currentSearcher = newSearcher; - if (newSearcher != null) { - newSearcher.getIndexReader().incRef(); - } - assert newSearchingGen > searchingGen.get(): "newSearchingGen=" + newSearchingGen + " searchingGen=" + searchingGen; - searchingGen.set(newSearchingGen); - } - - notifyAll(); - //System.out.println(Thread.currentThread().getName() + ": done"); } - - /** Close this NRTManager to future searching. Any - * searches still in process in other threads won't be - * affected, and they should still call {@link #release} - * after they are done. - * - *

NOTE: caller must separately close the writer. */ - @Override - public void close() throws IOException { - swapSearcher(null, indexingGen.getAndIncrement(), true); - } }