Index: lucene/contrib/CHANGES.txt =================================================================== --- lucene/contrib/CHANGES.txt (revision 1199565) +++ lucene/contrib/CHANGES.txt (working copy) @@ -96,6 +96,9 @@ Previously, there was no way to specify an IndexWriterConfig, and some of these methods would sneakily pass 'true' to optimize. (Robert Muir) + + * LUCENE-3558: Moved NRTManager & NRTManagerReopenThread into lucene core + o.a.l.search. (Simon Willnauer) New Features Index: lucene/contrib/misc/src/test/org/apache/lucene/search/TestSearcherManager.java =================================================================== --- lucene/contrib/misc/src/test/org/apache/lucene/search/TestSearcherManager.java (revision 1199565) +++ lucene/contrib/misc/src/test/org/apache/lucene/search/TestSearcherManager.java (working copy) @@ -1,259 +0,0 @@ -package org.apache.lucene.search; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.lucene.analysis.MockAnalyzer; -import org.apache.lucene.document.Document; -import org.apache.lucene.index.ConcurrentMergeScheduler; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.Term; -import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase; -import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.Directory; -import org.apache.lucene.util.LuceneTestCase.UseNoMemoryExpensiveCodec; -import org.apache.lucene.util.NamedThreadFactory; -import org.apache.lucene.util._TestUtil; - -@UseNoMemoryExpensiveCodec -public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase { - - boolean warmCalled; - - private SearcherLifetimeManager.Pruner pruner; - - public void testSearcherManager() throws Exception { - pruner = new SearcherLifetimeManager.PruneByAge(TEST_NIGHTLY ? _TestUtil.nextInt(random, 1, 20) : 1); - runTest("TestSearcherManager"); - } - - @Override - protected IndexSearcher getFinalSearcher() throws Exception { - if (!isNRT) { - writer.commit(); - } - assertTrue(mgr.maybeReopen() || mgr.isSearcherCurrent()); - return mgr.acquire(); - } - - private SearcherManager mgr; - private SearcherLifetimeManager lifetimeMGR; - private final List pastSearchers = new ArrayList(); - private boolean isNRT; - - @Override - protected void doAfterWriter(ExecutorService es) throws Exception { - final SearcherWarmer warmer = new SearcherWarmer() { - @Override - public void warm(IndexSearcher s) throws IOException { - TestSearcherManager.this.warmCalled = true; - s.search(new TermQuery(new Term("body", "united")), 10); - } - }; - if (random.nextBoolean()) { - // TODO: can we randomize the applyAllDeletes? But - // somehow for final searcher we must apply - // deletes... - mgr = new SearcherManager(writer, true, warmer, es); - isNRT = true; - } else { - // SearcherManager needs to see empty commit: - writer.commit(); - mgr = new SearcherManager(dir, warmer, es); - isNRT = false; - } - - - lifetimeMGR = new SearcherLifetimeManager(); - } - - @Override - protected void doSearching(ExecutorService es, final long stopTime) throws Exception { - - Thread reopenThread = new Thread() { - @Override - public void run() { - try { - while(System.currentTimeMillis() < stopTime) { - Thread.sleep(_TestUtil.nextInt(random, 1, 100)); - writer.commit(); - Thread.sleep(_TestUtil.nextInt(random, 1, 5)); - if (mgr.maybeReopen()) { - lifetimeMGR.prune(pruner); - } - } - } catch (Throwable t) { - System.out.println("TEST: reopen thread hit exc"); - t.printStackTrace(System.out); - failed.set(true); - throw new RuntimeException(t); - } - } - }; - reopenThread.setDaemon(true); - reopenThread.start(); - - runSearchThreads(stopTime); - - reopenThread.join(); - } - - @Override - protected IndexSearcher getCurrentSearcher() throws Exception { - if (random.nextInt(10) == 7) { - // NOTE: not best practice to call maybeReopen - // synchronous to your search threads, but still we - // test as apps will presumably do this for - // simplicity: - if (mgr.maybeReopen()) { - lifetimeMGR.prune(pruner); - } - } - - IndexSearcher s = null; - - synchronized(pastSearchers) { - while (pastSearchers.size() != 0 && random.nextDouble() < 0.25) { - // 1/4 of the time pull an old searcher, ie, simulate - // a user doing a follow-on action on a previous - // search (drilling down/up, clicking next/prev page, - // etc.) - final Long token = pastSearchers.get(random.nextInt(pastSearchers.size())); - s = lifetimeMGR.acquire(token); - if (s == null) { - // Searcher was pruned - pastSearchers.remove(token); - } else { - break; - } - } - } - - if (s == null) { - s = mgr.acquire(); - if (s.getIndexReader().numDocs() != 0) { - Long token = lifetimeMGR.record(s); - synchronized(pastSearchers) { - if (!pastSearchers.contains(token)) { - pastSearchers.add(token); - } - } - } - } - - return s; - } - - @Override - protected void releaseSearcher(IndexSearcher s) throws Exception { - s.getIndexReader().decRef(); - } - - @Override - protected void doClose() throws Exception { - assertTrue(warmCalled); - if (VERBOSE) { - System.out.println("TEST: now close SearcherManager"); - } - mgr.close(); - lifetimeMGR.close(); - } - - public void testIntermediateClose() throws IOException, InterruptedException { - Directory dir = newDirectory(); - // Test can deadlock if we use SMS: - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig( - TEST_VERSION_CURRENT, new MockAnalyzer(random)).setMergeScheduler(new ConcurrentMergeScheduler())); - writer.addDocument(new Document()); - writer.commit(); - final CountDownLatch awaitEnterWarm = new CountDownLatch(1); - final CountDownLatch awaitClose = new CountDownLatch(1); - final ExecutorService es = random.nextBoolean() ? null : Executors.newCachedThreadPool(new NamedThreadFactory("testIntermediateClose")); - final SearcherWarmer warmer = new SearcherWarmer() { - @Override - public void warm(IndexSearcher s) throws IOException { - try { - awaitEnterWarm.countDown(); - awaitClose.await(); - } catch (InterruptedException e) { - // - } - } - }; - final SearcherManager searcherManager = random.nextBoolean() ? new SearcherManager(dir, - warmer, es) : new SearcherManager(writer, random.nextBoolean(), warmer, es); - IndexSearcher searcher = searcherManager.acquire(); - try { - assertEquals(1, searcher.getIndexReader().numDocs()); - } finally { - searcherManager.release(searcher); - } - writer.addDocument(new Document()); - writer.commit(); - final AtomicBoolean success = new AtomicBoolean(false); - final AtomicBoolean triedReopen = new AtomicBoolean(false); - final Throwable[] exc = new Throwable[1]; - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - try { - triedReopen.set(true); - searcherManager.maybeReopen(); - success.set(true); - } catch (AlreadyClosedException e) { - // expected - } catch (Throwable e) { - exc[0] = e; - // use success as the barrier here to make sure we see the write - success.set(false); - - } - } - }); - thread.start(); - awaitEnterWarm.await(); - for (int i = 0; i < 2; i++) { - searcherManager.close(); - } - awaitClose.countDown(); - thread.join(); - try { - searcherManager.acquire(); - fail("already closed"); - } catch (AlreadyClosedException ex) { - // expected - } - assertFalse(success.get()); - assertTrue(triedReopen.get()); - assertNull("" + exc[0], exc[0]); - writer.close(); - dir.close(); - if (es != null) { - es.shutdown(); - es.awaitTermination(1, TimeUnit.SECONDS); - } - } -} Index: lucene/contrib/misc/src/test/org/apache/lucene/index/TestNRTManager.java =================================================================== --- lucene/contrib/misc/src/test/org/apache/lucene/index/TestNRTManager.java (revision 1199565) +++ lucene/contrib/misc/src/test/org/apache/lucene/index/TestNRTManager.java (working copy) @@ -1,352 +0,0 @@ -package org.apache.lucene.index; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.analysis.MockAnalyzer; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.TextField; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.SearcherManager; -import org.apache.lucene.search.SearcherWarmer; -import org.apache.lucene.search.TermQuery; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.LockObtainFailedException; -import org.apache.lucene.store.NRTCachingDirectory; -import org.apache.lucene.util.IOUtils; -import org.apache.lucene.util.LuceneTestCase.UseNoMemoryExpensiveCodec; -import org.apache.lucene.util.ThreadInterruptedException; - -@UseNoMemoryExpensiveCodec -public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase { - - private final ThreadLocal lastGens = new ThreadLocal(); - private boolean warmCalled; - - public void testNRTManager() throws Exception { - runTest("TestNRTManager"); - } - - @Override - protected IndexSearcher getFinalSearcher() throws Exception { - if (VERBOSE) { - System.out.println("TEST: finalSearcher maxGen=" + maxGen); - } - final SearcherManager manager = nrt.waitForGeneration(maxGen, true); - return manager.acquire(); - } - - @Override - protected Directory getDirectory(Directory in) { - // Randomly swap in NRTCachingDir - if (random.nextBoolean()) { - if (VERBOSE) { - System.out.println("TEST: wrap NRTCachingDir"); - } - - return new NRTCachingDirectory(in, 5.0, 60.0); - } else { - return in; - } - } - - @Override - protected void updateDocuments(Term id, List> docs) throws Exception { - final long gen = nrt.updateDocuments(id, docs); - - // Randomly verify the update "took": - if (random.nextInt(20) == 2) { - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); - } - SearcherManager manager = nrt.waitForGeneration(gen, true); - final IndexSearcher s = manager.acquire(); - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); - } - try { - assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits); - } finally { - manager.release(s); - } - } - - lastGens.set(gen); - } - - @Override - protected void addDocuments(Term id, List> docs) throws Exception { - final long gen = nrt.addDocuments(docs); - // Randomly verify the add "took": - if (random.nextInt(20) == 2) { - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); - } - final SearcherManager manager = nrt.waitForGeneration(gen, false); - final IndexSearcher s = manager.acquire(); - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); - } - try { - assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits); - } finally { - manager.release(s); - } - } - lastGens.set(gen); - } - - @Override - protected void addDocument(Term id, Iterable doc) throws Exception { - final long gen = nrt.addDocument(doc); - - // Randomly verify the add "took": - if (random.nextInt(20) == 2) { - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); - } - final SearcherManager manager = nrt.waitForGeneration(gen, false); - final IndexSearcher s = manager.acquire(); - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); - } - try { - assertEquals(1, s.search(new TermQuery(id), 10).totalHits); - } finally { - manager.release(s); - } - } - lastGens.set(gen); - } - - @Override - protected void updateDocument(Term id, Iterable doc) throws Exception { - final long gen = nrt.updateDocument(id, doc); - // Randomly verify the udpate "took": - if (random.nextInt(20) == 2) { - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); - } - final SearcherManager manager = nrt.waitForGeneration(gen, true); - final IndexSearcher s = manager.acquire(); - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); - } - try { - assertEquals(1, s.search(new TermQuery(id), 10).totalHits); - } finally { - manager.release(s); - } - } - lastGens.set(gen); - } - - @Override - protected void deleteDocuments(Term id) throws Exception { - final long gen = nrt.deleteDocuments(id); - // randomly verify the delete "took": - if (random.nextInt(20) == 7) { - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: verify del " + id); - } - final SearcherManager manager = nrt.waitForGeneration(gen, true); - final IndexSearcher s = manager.acquire(); - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); - } - try { - assertEquals(0, s.search(new TermQuery(id), 10).totalHits); - } finally { - manager.release(s); - } - } - lastGens.set(gen); - } - - private NRTManager nrt; - private NRTManagerReopenThread nrtThread; - @Override - protected void doAfterWriter(ExecutorService es) throws Exception { - final double minReopenSec = 0.01 + 0.05 * random.nextDouble(); - final double maxReopenSec = minReopenSec * (1.0 + 10 * random.nextDouble()); - - if (VERBOSE) { - System.out.println("TEST: make NRTManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec); - } - - nrt = new NRTManager(writer, es, - new SearcherWarmer() { - @Override - public void warm(IndexSearcher s) throws IOException { - TestNRTManager.this.warmCalled = true; - s.search(new TermQuery(new Term("body", "united")), 10); - } - }, false); - - nrtThread = new NRTManagerReopenThread(nrt, maxReopenSec, minReopenSec); - nrtThread.setName("NRT Reopen Thread"); - nrtThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY)); - nrtThread.setDaemon(true); - nrtThread.start(); - } - - @Override - protected void doAfterIndexingThreadDone() { - Long gen = lastGens.get(); - if (gen != null) { - addMaxGen(gen); - } - } - - private long maxGen = -1; - - private synchronized void addMaxGen(long gen) { - maxGen = Math.max(gen, maxGen); - } - - @Override - protected void doSearching(ExecutorService es, long stopTime) throws Exception { - runSearchThreads(stopTime); - } - - @Override - protected IndexSearcher getCurrentSearcher() throws Exception { - // Test doesn't assert deletions until the end, so we - // can randomize whether dels must be applied - return nrt.getSearcherManager(random.nextBoolean()).acquire(); - } - - @Override - protected void releaseSearcher(IndexSearcher s) throws Exception { - // Test doesn't assert deletions until the end, so we - // can randomize whether dels must be applied - nrt.getSearcherManager(random.nextBoolean()).release(s); - } - - @Override - protected void doClose() throws Exception { - assertTrue(warmCalled); - if (VERBOSE) { - System.out.println("TEST: now close NRTManager"); - } - nrtThread.close(); - nrt.close(); - } - - /* - * LUCENE-3528 - NRTManager hangs in certain situations - */ - public void testThreadStarvationNoDeleteNRTReader() throws IOException, InterruptedException { - IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)); - Directory d = newDirectory(); - final CountDownLatch latch = new CountDownLatch(1); - final CountDownLatch signal = new CountDownLatch(1); - - LatchedIndexWriter writer = new LatchedIndexWriter(d, conf, latch, signal); - final NRTManager manager = new NRTManager(writer, null, null, false); - Document doc = new Document(); - doc.add(newField("test","test", TextField.TYPE_STORED)); - long gen = manager.addDocument(doc); - assertTrue(manager.maybeReopen(false)); - assertFalse(gen < manager.getCurrentSearchingGen(false)); - Thread t = new Thread() { - public void run() { - try { - signal.await(); - assertTrue(manager.maybeReopen(false)); - manager.deleteDocuments(new TermQuery(new Term("foo", "barista"))); - manager.maybeReopen(false); // kick off another reopen so we inc. the internal gen - } catch (Exception e) { - e.printStackTrace(); - } finally { - latch.countDown(); // let the add below finish - } - } - }; - t.start(); - writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through - final long lastGen = manager.updateDocument(new Term("foo", "bar"), doc); // once this returns the doc is already reflected in the last reopen - assertFalse(manager.getSearcherManager(false).isSearcherCurrent()); // false since there is a delete in the queue - - IndexSearcher acquire = manager.getSearcherManager(false).acquire(); - try { - assertEquals(2, acquire.getIndexReader().numDocs()); - } finally { - acquire.getIndexReader().decRef(); - } - NRTManagerReopenThread thread = new NRTManagerReopenThread(manager, 0.01, 0.01); - thread.start(); // start reopening - if (VERBOSE) { - System.out.println("waiting now for generation " + lastGen); - } - - final AtomicBoolean finished = new AtomicBoolean(false); - Thread waiter = new Thread() { - public void run() { - manager.waitForGeneration(lastGen, false); - finished.set(true); - } - }; - waiter.start(); - manager.maybeReopen(false); - waiter.join(1000); - if (!finished.get()) { - waiter.interrupt(); - fail("thread deadlocked on waitForGeneration"); - } - thread.close(); - thread.join(); - IOUtils.close(manager, writer, d); - } - - public static class LatchedIndexWriter extends IndexWriter { - - private CountDownLatch latch; - boolean waitAfterUpdate = false; - private CountDownLatch signal; - - public LatchedIndexWriter(Directory d, IndexWriterConfig conf, - CountDownLatch latch, CountDownLatch signal) - throws CorruptIndexException, LockObtainFailedException, IOException { - super(d, conf); - this.latch = latch; - this.signal = signal; - - } - - public void updateDocument(Term term, - Iterable doc, Analyzer analyzer) - throws CorruptIndexException, IOException { - super.updateDocument(term, doc, analyzer); - try { - if (waitAfterUpdate) { - signal.countDown(); - latch.await(); - } - } catch (InterruptedException e) { - throw new ThreadInterruptedException(e); - } - } - } -} Index: lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherLifetimeManager.java =================================================================== --- lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherLifetimeManager.java (revision 1199565) +++ lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherLifetimeManager.java (working copy) @@ -1,292 +0,0 @@ -package org.apache.lucene.search; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; - -import org.apache.lucene.index.NRTManager; // javadocs -import org.apache.lucene.index.IndexReader; // javadocs -import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.util.IOUtils; - -/** - * Keeps track of current plus old IndexSearchers, closing - * the old ones once they have timed out. - * - * Use it like this: - * - *
- *   SearcherLifetimeManager mgr = new SearcherLifetimeManager();
- * 
- * - * Per search-request, if it's a "new" search request, then - * obtain the latest searcher you have (for example, by - * using {@link SearcherManager} or {@link NRTManager}), and - * then record this searcher: - * - *
- *   // Record token into user's search results, eg as a
- *   // hidden HTML form field:
- *   long token = mgr.record(searcher);
- * 
- * - * When a follow-up search arrives, for example the user - * clicks next page, drills down/up, etc., take the token - * that you saved from the previous search and: - * - *
- *   // If possible, obtain same searcher version as last
- *   // search:
- *   IndexSearcher searcher = mgr.acquire(version);
- *   if (searcher != null) {
- *     // Searcher is still here
- *     try {
- *       // do searching...
- *     } finally {
- *       mgr.release(searcher);
- *       // Do not use searcher after this!
- *       searcher = null;
- *     }
- *   } else {
- *     // Searcher was pruned -- notify user session timed
- *     // out, or, pull fresh searcher again
- *   }
- * 
- * - * Finally, in a separate thread, ideally the same thread - * that's periodically reopening your searchers, you should - * periodically prune old searchers: - * - *
- *   mgr.prune(new PruneByAge(600.0);
- * 
- * - *

NOTE: keeping many searchers around means - * you'll use more resources (open files, RAM) than a single - * searcher. However, as long as you are using {@link - * IndexReader#openIfChanged}, the searchers will usually - * share almost all segments and the added resource usage is - * contained. When a large merge has completed, and - * you reopen, because that is a large change, the new - * searcher will use higher additional RAM than other - * searchers; but large merges don't complete very often and - * it's unlikely you'll hit two of them in your expiration - * window. Still you should budget plenty of heap in the - * JVM to have a good safety margin. - */ - -public class SearcherLifetimeManager implements Closeable { - - private static class SearcherTracker implements Comparable, Closeable { - public final IndexSearcher searcher; - public final long recordTimeSec; - public final long version; - - public SearcherTracker(IndexSearcher searcher) { - this.searcher = searcher; - version = searcher.getIndexReader().getVersion(); - searcher.getIndexReader().incRef(); - // Use nanoTime not currentTimeMillis since it [in - // theory] reduces risk from clock shift - recordTimeSec = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()); - } - - // Newer searchers are sort before older ones: - @Override - public int compareTo(SearcherTracker other) { - // Be defensive: cannot subtract since it could - // technically overflow long, though, we'd never hit - // that in practice: - if (recordTimeSec < other.recordTimeSec) { - return 1; - } else if (other.recordTimeSec < recordTimeSec) { - return -1; - } else { - return 0; - } - } - - @Override - public synchronized void close() throws IOException { - searcher.getIndexReader().decRef(); - } - } - - private volatile boolean closed; - - // TODO: we could get by w/ just a "set"; need to have - // Tracker hash by its version and have compareTo(Long) - // compare to its version - private final ConcurrentHashMap searchers = new ConcurrentHashMap(); - - private void ensureOpen() { - if (closed) { - throw new AlreadyClosedException("this SearcherLifetimeManager instance is closed"); - } - } - - /** Records that you are now using this IndexSearcher. - * Always call this when you've obtained a possibly new - * {@link IndexSearcher}, for example from one of the - * get methods in {@link NRTManager} or {@link - * SearcherManager}. It's fine if you already passed the - * same searcher to this method before. - * - *

This returns the long token that you can later pass - * to {@link #acquire} to retrieve the same IndexSearcher. - * You should record this long token in the search results - * sent to your user, such that if the user performs a - * follow-on action (clicks next page, drills down, etc.) - * the token is returned. */ - public long record(IndexSearcher searcher) throws IOException { - ensureOpen(); - // TODO: we don't have to use IR.getVersion to track; - // could be risky (if it's buggy); we could get better - // bug isolation if we assign our own private ID: - final long version = searcher.getIndexReader().getVersion(); - SearcherTracker tracker = searchers.get(version); - if (tracker == null) { - tracker = new SearcherTracker(searcher); - if (searchers.putIfAbsent(version, tracker) != null) { - // Another thread beat us -- must decRef to undo - // incRef done by SearcherTracker ctor: - tracker.close(); - } - } else if (tracker.searcher != searcher) { - throw new IllegalArgumentException("the provided searcher has the same underlying reader version yet the searcher instance differs from before (new=" + searcher + " vs old=" + tracker.searcher); - } - - return version; - } - - /** Retrieve a previously recorded {@link IndexSearcher}, if it - * has not yet been closed - * - *

NOTE: this may return null when the - * requested searcher has already timed out. When this - * happens you should notify your user that their session - * timed out and that they'll have to restart their - * search. - * - *

If this returns a non-null result, you must match - * later call {@link #release} on this searcher, best - * from a finally clause. */ - public IndexSearcher acquire(long version) { - ensureOpen(); - final SearcherTracker tracker = searchers.get(version); - if (tracker != null && - tracker.searcher.getIndexReader().tryIncRef()) { - return tracker.searcher; - } - - return null; - } - - /** Release a searcher previously obtained from {@link - * #acquire}. - * - *

NOTE: it's fine to call this after close. */ - public void release(IndexSearcher s) throws IOException { - s.getIndexReader().decRef(); - } - - /** See {@link #prune}. */ - public interface Pruner { - /** Return true if this searcher should be removed. - * @param ageSec how long ago this searcher was - * recorded vs the most recently recorded - * searcher - * @param searcher Searcher - **/ - public boolean doPrune(int ageSec, IndexSearcher searcher); - } - - /** Simple pruner that drops any searcher older by - * more than the specified seconds, than the newest - * searcher. */ - public final static class PruneByAge implements Pruner { - private final int maxAgeSec; - - public PruneByAge(int maxAgeSec) { - if (maxAgeSec < 1) { - throw new IllegalArgumentException("maxAgeSec must be > 0 (got " + maxAgeSec + ")"); - } - this.maxAgeSec = maxAgeSec; - } - - @Override - public boolean doPrune(int ageSec, IndexSearcher searcher) { - return ageSec > maxAgeSec; - } - } - - /** Calls provided {@link Pruner} to prune entries. The - * entries are passed to the Pruner in sorted (newest to - * oldest IndexSearcher) order. - * - *

NOTE: you must peridiocally call this, ideally - * from the same background thread that opens new - * searchers. */ - public synchronized void prune(Pruner pruner) throws IOException { - final List trackers = new ArrayList(searchers.values()); - Collections.sort(trackers); - final long newestSec = trackers.isEmpty() ? 0L : trackers.get(0).recordTimeSec; - for (SearcherTracker tracker: trackers) { - final int ageSec = (int) (newestSec - tracker.recordTimeSec); - assert ageSec >= 0; - if (pruner.doPrune(ageSec, tracker.searcher)) { - searchers.remove(tracker.version); - tracker.close(); - } - } - } - - /** Close this to future searching; any searches still in - * process in other threads won't be affected, and they - * should still call {@link #release} after they are - * done. - * - *

NOTE: you must ensure no other threads are - * calling {@link #record} while you call close(); - * otherwise it's possible not all searcher references - * will be freed. */ - @Override - public synchronized void close() throws IOException { - closed = true; - final List toClose = new ArrayList(searchers.values()); - - // Remove up front in case exc below, so we don't - // over-decRef on double-close: - for(SearcherTracker tracker : toClose) { - searchers.remove(tracker.version); - } - - IOUtils.close(toClose); - - // Make some effort to catch mis-use: - if (searchers.size() != 0) { - throw new IllegalStateException("another thread called record while this SearcherLifetimeManager instance was being closed; not all searchers were closed"); - } - } -} Index: lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherManager.java =================================================================== --- lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherManager.java (revision 1199565) +++ lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherManager.java (working copy) @@ -1,254 +0,0 @@ -package org.apache.lucene.search; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.IOException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Semaphore; - -import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.NRTManager; // javadocs -import org.apache.lucene.search.IndexSearcher; // javadocs -import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.Directory; - -/** - * Utility class to safely share {@link IndexSearcher} instances across multiple - * threads, while periodically reopening. This class ensures each searcher is - * closed only once all threads have finished using it. - * - *

- * Use {@link #acquire} to obtain the current searcher, and {@link #release} to - * release it, like this: - * - *

- * IndexSearcher s = manager.acquire();
- * try {
- *   // Do searching, doc retrieval, etc. with s
- * } finally {
- *   manager.release(s);
- * }
- * // Do not use s after this!
- * s = null;
- * 
- * - *

- * In addition you should periodically call {@link #maybeReopen}. While it's - * possible to call this just before running each query, this is discouraged - * since it penalizes the unlucky queries that do the reopen. It's better to use - * a separate background thread, that periodically calls maybeReopen. Finally, - * be sure to call {@link #close} once you are done. - * - *

- * NOTE: if you have an {@link IndexWriter}, it's better to use - * {@link NRTManager} since that class pulls near-real-time readers from the - * IndexWriter. - * - * @lucene.experimental - */ - -public final class SearcherManager { - - private volatile IndexSearcher currentSearcher; - private final ExecutorService es; - private final SearcherWarmer warmer; - private final Semaphore reopenLock = new Semaphore(1); - - /** - * Creates and returns a new SearcherManager from the given {@link IndexWriter}. - * @param writer the IndexWriter to open the IndexReader from. - * @param applyAllDeletes If true, all buffered deletes will - * be applied (made visible) in the {@link IndexSearcher} / {@link IndexReader}. - * If false, the deletes may or may not be applied, but remain buffered - * (in IndexWriter) so that they will be applied in the future. - * Applying deletes can be costly, so if your app can tolerate deleted documents - * being returned you might gain some performance by passing false. - * See {@link IndexReader#openIfChanged(IndexReader, IndexWriter, boolean)}. - * @param warmer An optional {@link SearcherWarmer}. Pass - * null if you don't require the searcher to warmed - * before going live. If this is non-null then a - * merged segment warmer is installed on the - * provided IndexWriter's config. - * @param es An optional {@link ExecutorService} so different segments can - * be searched concurrently (see {@link - * IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass null - * to search segments sequentially. - * - * @throws IOException - */ - public SearcherManager(IndexWriter writer, boolean applyAllDeletes, - final SearcherWarmer warmer, final ExecutorService es) throws IOException { - this.es = es; - this.warmer = warmer; - currentSearcher = new IndexSearcher(IndexReader.open(writer, applyAllDeletes)); - if (warmer != null) { - writer.getConfig().setMergedSegmentWarmer( - new IndexWriter.IndexReaderWarmer() { - @Override - public void warm(IndexReader reader) throws IOException { - warmer.warm(new IndexSearcher(reader, es)); - } - }); - } - } - - /** - * Creates and returns a new SearcherManager from the given {@link Directory}. - * @param dir the directory to open the IndexReader on. - * @param warmer An optional {@link SearcherWarmer}. Pass - * null if you don't require the searcher to warmed - * before going live. If this is non-null then a - * merged segment warmer is installed on the - * provided IndexWriter's config. - * @param es And optional {@link ExecutorService} so different segments can - * be searched concurrently (see {@link - * IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass null - * to search segments sequentially. - * - * @throws IOException - */ - public SearcherManager(Directory dir, SearcherWarmer warmer, - ExecutorService es) throws IOException { - this.es = es; - this.warmer = warmer; - currentSearcher = new IndexSearcher(IndexReader.open(dir, true), es); - } - - /** - * You must call this, periodically, to perform a reopen. This calls - * {@link IndexReader#openIfChanged(IndexReader)} with the underlying reader, and if that returns a - * new reader, it's warmed (if you provided a {@link SearcherWarmer} and then - * swapped into production. - * - *

- * Threads: it's fine for more than one thread to call this at once. - * Only the first thread will attempt the reopen; subsequent threads will see - * that another thread is already handling reopen and will return immediately. - * Note that this means if another thread is already reopening then subsequent - * threads will return right away without waiting for the reader reopen to - * complete. - *

- * - *

- * This method returns true if a new reader was in fact opened or - * if the current searcher has no pending changes. - *

- */ - public boolean maybeReopen() throws IOException { - ensureOpen(); - // Ensure only 1 thread does reopen at once; other - // threads just return immediately: - if (reopenLock.tryAcquire()) { - try { - // IR.openIfChanged preserves NRT and applyDeletes - // in the newly returned reader: - final IndexReader newReader = IndexReader.openIfChanged(currentSearcher.getIndexReader()); - if (newReader != null) { - final IndexSearcher newSearcher = new IndexSearcher(newReader, es); - boolean success = false; - try { - if (warmer != null) { - warmer.warm(newSearcher); - } - swapSearcher(newSearcher); - success = true; - } finally { - if (!success) { - release(newSearcher); - } - } - } - return true; - } finally { - reopenLock.release(); - } - } else { - return false; - } - } - - /** - * Returns true if no changes have occured since this searcher - * ie. reader was opened, otherwise false. - * @see IndexReader#isCurrent() - */ - public boolean isSearcherCurrent() throws CorruptIndexException, - IOException { - final IndexSearcher searcher = acquire(); - try { - return searcher.getIndexReader().isCurrent(); - } finally { - release(searcher); - } - } - - /** - * Release the searcher previously obtained with {@link #acquire}. - * - *

- * NOTE: it's safe to call this after {@link #close}. - */ - public void release(IndexSearcher searcher) throws IOException { - assert searcher != null; - searcher.getIndexReader().decRef(); - } - - /** - * Close this SearcherManager to future searching. Any searches still in - * process in other threads won't be affected, and they should still call - * {@link #release} after they are done. - */ - public synchronized void close() throws IOException { - if (currentSearcher != null) { - // make sure we can call this more than once - // closeable javadoc says: - // if this is already closed then invoking this method has no effect. - swapSearcher(null); - } - } - - /** - * Obtain the current IndexSearcher. You must match every call to acquire with - * one call to {@link #release}; it's best to do so in a finally clause. - */ - public IndexSearcher acquire() { - IndexSearcher searcher; - do { - if ((searcher = currentSearcher) == null) { - throw new AlreadyClosedException("this SearcherManager is closed"); - } - } while (!searcher.getIndexReader().tryIncRef()); - return searcher; - } - - private void ensureOpen() { - if (currentSearcher == null) { - throw new AlreadyClosedException("this SearcherManager is closed"); - } - } - - private synchronized void swapSearcher(IndexSearcher newSearcher) throws IOException { - ensureOpen(); - final IndexSearcher oldSearcher = currentSearcher; - currentSearcher = newSearcher; - release(oldSearcher); - } - -} Index: lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherWarmer.java =================================================================== --- lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherWarmer.java (revision 1199565) +++ lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherWarmer.java (working copy) @@ -1,34 +0,0 @@ -package org.apache.lucene.search; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.IOException; - -import org.apache.lucene.index.NRTManager; // javadocs - -/** Pass an implementation of this to {@link NRTManager} or - * {@link SearcherManager} to warm a new {@link - * IndexSearcher} before it's put into production. - * - * @lucene.experimental */ - -public interface SearcherWarmer { - // TODO: can we somehow merge this w/ IW's - // IndexReaderWarmer.... should IW switch to this? - public void warm(IndexSearcher s) throws IOException; -} Index: lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManagerReopenThread.java =================================================================== --- lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManagerReopenThread.java (revision 1199565) +++ lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManagerReopenThread.java (working copy) @@ -1,198 +0,0 @@ -package org.apache.lucene.index; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.Closeable; -import java.io.IOException; - -import org.apache.lucene.util.ThreadInterruptedException; - -/** - * Utility class that runs a reopen thread to periodically - * reopen the NRT searchers in the provided {@link - * NRTManager}. - * - *

Typical usage looks like this: - * - *

- *   ... open your own writer ...
- * 
- *   NRTManager manager = new NRTManager(writer);
- *
- *   // Refreshes searcher every 5 seconds when nobody is waiting, and up to 100 msec delay
- *   // when somebody is waiting:
- *   NRTManagerReopenThread reopenThread = new NRTManagerReopenThread(manager, 5.0, 0.1);
- *   reopenThread.setName("NRT Reopen Thread");
- *   reopenThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
- *   reopenThread.setDaemon(true);
- *   reopenThread.start();
- * 
- * - * Then, for each incoming query, do this: - * - *
- *   // For each incoming query:
- *   IndexSearcher searcher = manager.get();
- *   try {
- *     // Use searcher to search...
- *   } finally {
- *     manager.release(searcher);
- *   }
- * 
- * - * You should make changes using the NRTManager; if you later need to obtain - * a searcher reflecting those changes: - * - *
- *   // ... or updateDocument, deleteDocuments, etc:
- *   long gen = manager.addDocument(...);
- *   
- *   // Returned searcher is guaranteed to reflect the just added document
- *   IndexSearcher searcher = manager.get(gen);
- *   try {
- *     // Use searcher to search...
- *   } finally {
- *     manager.release(searcher);
- *   }
- * 
- * - * - * When you are done be sure to close both the manager and the reopen thrad: - *
 
- *   reopenThread.close();       
- *   manager.close();
- * 
- */ - -public class NRTManagerReopenThread extends Thread implements NRTManager.WaitingListener, Closeable { - - private final NRTManager manager; - private final long targetMaxStaleNS; - private final long targetMinStaleNS; - private boolean finish; - private long waitingGen; - private boolean waitingNeedsDeletes; - - /** - * Create NRTManagerReopenThread, to periodically reopen the NRT searcher. - * - * @param targetMaxStaleSec Maximum time until a new - * reader must be opened; this sets the upper bound - * on how slowly reopens may occur - * - * @param targetMinStaleSec Mininum time until a new - * reader can be opened; this sets the lower bound - * on how quickly reopens may occur, when a caller - * is waiting for a specific indexing change to - * become visible. - */ - - public NRTManagerReopenThread(NRTManager manager, double targetMaxStaleSec, double targetMinStaleSec) { - if (targetMaxStaleSec < targetMinStaleSec) { - throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")"); - } - this.manager = manager; - this.targetMaxStaleNS = (long) (1000000000*targetMaxStaleSec); - this.targetMinStaleNS = (long) (1000000000*targetMinStaleSec); - manager.addWaitingListener(this); - } - - public synchronized void close() { - //System.out.println("NRT: set finish"); - manager.removeWaitingListener(this); - this.finish = true; - notify(); - try { - join(); - } catch (InterruptedException ie) { - throw new ThreadInterruptedException(ie); - } - } - - public synchronized void waiting(boolean needsDeletes, long targetGen) { - waitingNeedsDeletes |= needsDeletes; - waitingGen = Math.max(waitingGen, targetGen); - notify(); - //System.out.println(Thread.currentThread().getName() + ": force wakeup waitingGen=" + waitingGen + " applyDeletes=" + applyDeletes + " waitingNeedsDeletes=" + waitingNeedsDeletes); - } - - @Override - public void run() { - // TODO: maybe use private thread ticktock timer, in - // case clock shift messes up nanoTime? - long lastReopenStartNS = System.nanoTime(); - - //System.out.println("reopen: start"); - try { - while (true) { - - boolean hasWaiting = false; - - synchronized(this) { - // TODO: try to guestimate how long reopen might - // take based on past data? - - while (!finish) { - //System.out.println("reopen: cycle"); - - // True if we have someone waiting for reopen'd searcher: - hasWaiting = waitingGen > manager.getCurrentSearchingGen(waitingNeedsDeletes); - final long nextReopenStartNS = lastReopenStartNS + (hasWaiting ? targetMinStaleNS : targetMaxStaleNS); - - final long sleepNS = nextReopenStartNS - System.nanoTime(); - - if (sleepNS > 0) { - //System.out.println("reopen: sleep " + (sleepNS/1000000.0) + " ms (hasWaiting=" + hasWaiting + ")"); - try { - wait(sleepNS/1000000, (int) (sleepNS%1000000)); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - //System.out.println("NRT: set finish on interrupt"); - finish = true; - break; - } - } else { - break; - } - } - - if (finish) { - //System.out.println("reopen: finish"); - return; - } - //System.out.println("reopen: start hasWaiting=" + hasWaiting); - } - - lastReopenStartNS = System.nanoTime(); - try { - //final long t0 = System.nanoTime(); - manager.maybeReopen(waitingNeedsDeletes); - //System.out.println("reopen took " + ((System.nanoTime()-t0)/1000000.0) + " msec"); - } catch (IOException ioe) { - //System.out.println(Thread.currentThread().getName() + ": IOE"); - //ioe.printStackTrace(); - throw new RuntimeException(ioe); - } - } - } catch (Throwable t) { - //System.out.println("REOPEN EXC"); - //t.printStackTrace(System.out); - throw new RuntimeException(t); - } - } -} Index: lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManager.java =================================================================== --- lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManager.java (revision 1199565) +++ lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManager.java (working copy) @@ -1,355 +0,0 @@ -package org.apache.lucene.index; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.index.IndexReader; // javadocs -import org.apache.lucene.search.IndexSearcher; // javadocs -import org.apache.lucene.search.Query; -import org.apache.lucene.search.SearcherManager; -import org.apache.lucene.search.SearcherWarmer; -import org.apache.lucene.util.IOUtils; -import org.apache.lucene.util.ThreadInterruptedException; - -/** - * Utility class to manage sharing near-real-time searchers - * across multiple searching threads. - * - *

NOTE: to use this class, you must call {@link #maybeReopen(boolean)} - * periodically. The {@link NRTManagerReopenThread} is a - * simple class to do this on a periodic basis. If you - * implement your own reopener, be sure to call {@link - * #addWaitingListener} so your reopener is notified when a - * caller is waiting for a specific generation searcher.

- * - * @lucene.experimental - */ - -public class NRTManager implements Closeable { - private static final long MAX_SEARCHER_GEN = Long.MAX_VALUE; - private final IndexWriter writer; - private final SearcherManagerRef withoutDeletes; - private final SearcherManagerRef withDeletes; - private final AtomicLong indexingGen; - private final List waitingListeners = new CopyOnWriteArrayList(); - private final ReentrantLock reopenLock = new ReentrantLock(); - private final Condition newGeneration = reopenLock.newCondition(); - - /** - * Create new NRTManager. - * - * @param writer IndexWriter to open near-real-time - * readers - * @param es optional ExecutorService so different segments can - * be searched concurrently (see {@link IndexSearcher#IndexSearcher(IndexReader, ExecutorService)}. - * Pass null to search segments sequentially. - * @param warmer optional {@link SearcherWarmer}. Pass - * null if you don't require the searcher to warmed - * before going live. If this is non-null then a - * merged segment warmer is installed on the - * provided IndexWriter's config. - * - *

NOTE: the provided {@link SearcherWarmer} is - * not invoked for the initial searcher; you should - * warm it yourself if necessary. - */ - public NRTManager(IndexWriter writer, ExecutorService es, - SearcherWarmer warmer) throws IOException { - this(writer, es, warmer, true); - } - - /** - * Expert: just like {@link - * #NRTManager(IndexWriter,ExecutorService,SearcherWarmer)}, - * but you can also specify whether every searcher must - * apply deletes. This is useful for cases where certain - * uses can tolerate seeing some deleted docs, since - * reopen time is faster if deletes need not be applied. */ - public NRTManager(IndexWriter writer, ExecutorService es, - SearcherWarmer warmer, boolean alwaysApplyDeletes) throws IOException { - this.writer = writer; - if (alwaysApplyDeletes) { - withoutDeletes = withDeletes = new SearcherManagerRef(true, 0, new SearcherManager(writer, true, warmer, es)); - } else { - withDeletes = new SearcherManagerRef(true, 0, new SearcherManager(writer, true, warmer, es)); - withoutDeletes = new SearcherManagerRef(false, 0, new SearcherManager(writer, false, warmer, es)); - } - indexingGen = new AtomicLong(1); - } - - /** NRTManager invokes this interface to notify it when a - * caller is waiting for a specific generation searcher - * to be visible. */ - public static interface WaitingListener { - public void waiting(boolean requiresDeletes, long targetGen); - } - - /** Adds a listener, to be notified when a caller is - * waiting for a specific generation searcher to be - * visible. */ - public void addWaitingListener(WaitingListener l) { - waitingListeners.add(l); - } - - /** Remove a listener added with {@link - * #addWaitingListener}. */ - public void removeWaitingListener(WaitingListener l) { - waitingListeners.remove(l); - } - - public long updateDocument(Term t, Iterable d, Analyzer a) throws IOException { - writer.updateDocument(t, d, a); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long updateDocument(Term t, Iterable d) throws IOException { - writer.updateDocument(t, d); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long updateDocuments(Term t, Iterable> docs, Analyzer a) throws IOException { - writer.updateDocuments(t, docs, a); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long updateDocuments(Term t, Iterable> docs) throws IOException { - writer.updateDocuments(t, docs); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long deleteDocuments(Term t) throws IOException { - writer.deleteDocuments(t); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long deleteDocuments(Query q) throws IOException { - writer.deleteDocuments(q); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long addDocument(Iterable d, Analyzer a) throws IOException { - writer.addDocument(d, a); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long addDocuments(Iterable> docs, Analyzer a) throws IOException { - writer.addDocuments(docs, a); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long addDocument(Iterable d) throws IOException { - writer.addDocument(d); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long addDocuments(Iterable> docs) throws IOException { - writer.addDocuments(docs); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - /** - * Waits for a given {@link SearcherManager} target generation to be available - * via {@link #getSearcherManager(boolean)}. If the current generation is less - * than the given target generation this method will block until the - * correspondent {@link SearcherManager} is reopened by another thread via - * {@link #maybeReopen(boolean)} or until the {@link NRTManager} is closed. - * - * @param targetGen the generation to wait for - * @param requireDeletes true iff the generation requires deletes to be applied otherwise false - * @return the {@link SearcherManager} with the given target generation - */ - public SearcherManager waitForGeneration(long targetGen, boolean requireDeletes) { - return waitForGeneration(targetGen, requireDeletes, -1, TimeUnit.NANOSECONDS); - } - - /** - * Waits for a given {@link SearcherManager} target generation to be available - * via {@link #getSearcherManager(boolean)}. If the current generation is less - * than the given target generation this method will block until the - * correspondent {@link SearcherManager} is reopened by another thread via - * {@link #maybeReopen(boolean)}, the given waiting time has elapsed, or until - * the {@link NRTManager} is closed. - *

- * NOTE: if the waiting time elapses before the requested target generation is - * available the latest {@link SearcherManager} is returned instead. - * - * @param targetGen - * the generation to wait for - * @param requireDeletes - * true iff the generation requires deletes to be - * applied otherwise false - * @param time - * the time to wait for the target generation - * @param unit - * the waiting time's time unit - * @return the {@link SearcherManager} with the given target generation or the - * latest {@link SearcherManager} if the waiting time elapsed before - * the requested generation is available. - */ - public SearcherManager waitForGeneration(long targetGen, boolean requireDeletes, long time, TimeUnit unit) { - try { - final long curGen = indexingGen.get(); - if (targetGen > curGen) { - throw new IllegalArgumentException("targetGen=" + targetGen + " was never returned by this NRTManager instance (current gen=" + curGen + ")"); - } - reopenLock.lockInterruptibly(); - try { - if (targetGen > getCurrentSearchingGen(requireDeletes)) { - for (WaitingListener listener : waitingListeners) { - listener.waiting(requireDeletes, targetGen); - } - while (targetGen > getCurrentSearchingGen(requireDeletes)) { - if (!waitOnGenCondition(time, unit)) { - return getSearcherManager(requireDeletes); - } - } - } - - } finally { - reopenLock.unlock(); - } - } catch (InterruptedException ie) { - throw new ThreadInterruptedException(ie); - } - return getSearcherManager(requireDeletes); - } - - private boolean waitOnGenCondition(long time, TimeUnit unit) - throws InterruptedException { - assert reopenLock.isHeldByCurrentThread(); - if (time < 0) { - newGeneration.await(); - return true; - } else { - return newGeneration.await(time, unit); - } - } - - /** Returns generation of current searcher. */ - public long getCurrentSearchingGen(boolean applyAllDeletes) { - if (applyAllDeletes) { - return withDeletes.generation; - } else { - return Math.max(withoutDeletes.generation, withDeletes.generation); - } - } - - public boolean maybeReopen(boolean applyAllDeletes) throws IOException { - if (reopenLock.tryLock()) { - try { - final SearcherManagerRef reference = applyAllDeletes ? withDeletes : withoutDeletes; - // Mark gen as of when reopen started: - final long newSearcherGen = indexingGen.getAndIncrement(); - boolean setSearchGen = false; - if (reference.generation == MAX_SEARCHER_GEN) { - newGeneration.signalAll(); // wake up threads if we have a new generation - return false; - } - if (!(setSearchGen = reference.manager.isSearcherCurrent())) { - setSearchGen = reference.manager.maybeReopen(); - } - if (setSearchGen) { - reference.generation = newSearcherGen;// update searcher gen - newGeneration.signalAll(); // wake up threads if we have a new generation - } - return setSearchGen; - } finally { - reopenLock.unlock(); - } - } - return false; - } - - /** - * Close this NRTManager to future searching. Any searches still in process in - * other threads won't be affected, and they should still call - * {@link SearcherManager#release(IndexSearcher)} after they are done. - * - *

- * NOTE: caller must separately close the writer. - */ - public void close() throws IOException { - reopenLock.lock(); - try { - try { - IOUtils.close(withDeletes, withoutDeletes); - } finally { // make sure we signal even if close throws an exception - newGeneration.signalAll(); - } - } finally { - reopenLock.unlock(); - assert withDeletes.generation == MAX_SEARCHER_GEN && withoutDeletes.generation == MAX_SEARCHER_GEN; - } - } - - /** - * Returns a {@link SearcherManager}. If applyAllDeletes is - * true the returned manager is guaranteed to have all deletes - * applied on the last reopen. Otherwise the latest manager with or without deletes - * is returned. - */ - public SearcherManager getSearcherManager(boolean applyAllDeletes) { - if (applyAllDeletes) { - return withDeletes.manager; - } else { - if (withDeletes.generation > withoutDeletes.generation) { - return withDeletes.manager; - } else { - return withoutDeletes.manager; - } - } - } - - static final class SearcherManagerRef implements Closeable { - final boolean applyDeletes; - volatile long generation; - final SearcherManager manager; - - SearcherManagerRef(boolean applyDeletes, long generation, SearcherManager manager) { - super(); - this.applyDeletes = applyDeletes; - this.generation = generation; - this.manager = manager; - } - - public void close() throws IOException { - generation = MAX_SEARCHER_GEN; // max it out to make sure nobody can wait on another gen - manager.close(); - } - } -} Index: lucene/CHANGES.txt =================================================================== --- lucene/CHANGES.txt (revision 1199565) +++ lucene/CHANGES.txt (working copy) @@ -726,6 +726,9 @@ for more efficient storage of int arrays when the values are bounded, for example for storing the terms dict index (Toke Eskildsen via Mike McCandless) + +* LUCENE-3558: Moved SearcherManager, NRTManager & SearcherLifetimeManager into + core. All classes are contained in o.a.l.search. (Simon Willnauer) Optimizations Index: lucene/src/test/org/apache/lucene/search/TestNRTManager.java =================================================================== --- lucene/src/test/org/apache/lucene/search/TestNRTManager.java (revision 1199565) +++ lucene/src/test/org/apache/lucene/search/TestNRTManager.java (working copy) @@ -1,4 +1,4 @@ -package org.apache.lucene.index; +package org.apache.lucene.search; /** * Licensed to the Apache Software Foundation (ASF) under one or more @@ -27,10 +27,13 @@ import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.TextField; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.SearcherManager; -import org.apache.lucene.search.SearcherWarmer; -import org.apache.lucene.search.TermQuery; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.index.Term; +import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase; +import org.apache.lucene.search.NRTManagerReopenThread; import org.apache.lucene.store.Directory; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.NRTCachingDirectory; Index: lucene/src/java/org/apache/lucene/search/SearcherWarmer.java =================================================================== --- lucene/src/java/org/apache/lucene/search/SearcherWarmer.java (revision 1199565) +++ lucene/src/java/org/apache/lucene/search/SearcherWarmer.java (working copy) @@ -19,7 +19,7 @@ import java.io.IOException; -import org.apache.lucene.index.NRTManager; // javadocs +import org.apache.lucene.search.NRTManager; // javadocs /** Pass an implementation of this to {@link NRTManager} or * {@link SearcherManager} to warm a new {@link Index: lucene/src/java/org/apache/lucene/search/SearcherManager.java =================================================================== --- lucene/src/java/org/apache/lucene/search/SearcherManager.java (revision 1199565) +++ lucene/src/java/org/apache/lucene/search/SearcherManager.java (working copy) @@ -24,7 +24,7 @@ import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.NRTManager; // javadocs +import org.apache.lucene.search.NRTManager; // javadocs import org.apache.lucene.search.IndexSearcher; // javadocs import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; Index: lucene/src/java/org/apache/lucene/search/NRTManagerReopenThread.java =================================================================== --- lucene/src/java/org/apache/lucene/search/NRTManagerReopenThread.java (revision 1199565) +++ lucene/src/java/org/apache/lucene/search/NRTManagerReopenThread.java (working copy) @@ -1,4 +1,4 @@ -package org.apache.lucene.index; +package org.apache.lucene.search; /** * Licensed to the Apache Software Foundation (ASF) under one or more @@ -77,6 +77,8 @@ * reopenThread.close(); * manager.close(); * + * + * @lucene.experimental */ public class NRTManagerReopenThread extends Thread implements NRTManager.WaitingListener, Closeable { Index: lucene/src/java/org/apache/lucene/search/SearcherLifetimeManager.java =================================================================== --- lucene/src/java/org/apache/lucene/search/SearcherLifetimeManager.java (revision 1199565) +++ lucene/src/java/org/apache/lucene/search/SearcherLifetimeManager.java (working copy) @@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import org.apache.lucene.index.NRTManager; // javadocs +import org.apache.lucene.search.NRTManager; // javadocs import org.apache.lucene.index.IndexReader; // javadocs import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.IOUtils; @@ -94,6 +94,8 @@ * it's unlikely you'll hit two of them in your expiration * window. Still you should budget plenty of heap in the * JVM to have a good safety margin. + * + * @lucene.experimental */ public class SearcherLifetimeManager implements Closeable { Index: lucene/src/java/org/apache/lucene/search/NRTManager.java =================================================================== --- lucene/src/java/org/apache/lucene/search/NRTManager.java (revision 1199565) +++ lucene/src/java/org/apache/lucene/search/NRTManager.java (working copy) @@ -1,4 +1,4 @@ -package org.apache.lucene.index; +package org.apache.lucene.search; /** * Licensed to the Apache Software Foundation (ASF) under one or more @@ -28,11 +28,11 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.index.IndexReader; // javadocs -import org.apache.lucene.search.IndexSearcher; // javadocs -import org.apache.lucene.search.Query; -import org.apache.lucene.search.SearcherManager; -import org.apache.lucene.search.SearcherWarmer; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.NRTManagerReopenThread; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.ThreadInterruptedException;