Index: lucene/core/src/java/org/apache/lucene/search/NRTManagerReopenThread.java =================================================================== --- lucene/core/src/java/org/apache/lucene/search/NRTManagerReopenThread.java (revision 1476852) +++ lucene/core/src/java/org/apache/lucene/search/NRTManagerReopenThread.java (working copy) @@ -1,200 +0,0 @@ -package org.apache.lucene.search; - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.Closeable; -import java.io.IOException; - -import org.apache.lucene.util.ThreadInterruptedException; - -/** - * Utility class that runs a reopen thread to periodically - * reopen the NRT searchers in the provided {@link - * NRTManager}. - * - *

Typical usage looks like this: - * - *

- *   ... open your own writer ...
- * 
- *   NRTManager manager = new NRTManager(writer);
- *
- *   // Refreshes searcher every 5 seconds when nobody is waiting, and up to 100 msec delay
- *   // when somebody is waiting:
- *   NRTManagerReopenThread reopenThread = new NRTManagerReopenThread(manager, 5.0, 0.1);
- *   reopenThread.setName("NRT Reopen Thread");
- *   reopenThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
- *   reopenThread.setDaemon(true);
- *   reopenThread.start();
- * 
- * - * Then, for each incoming query, do this: - * - *
- *   // For each incoming query:
- *   IndexSearcher searcher = manager.get();
- *   try {
- *     // Use searcher to search...
- *   } finally {
- *     manager.release(searcher);
- *   }
- * 
- * - * You should make changes using the NRTManager; if you later need to obtain - * a searcher reflecting those changes: - * - *
- *   // ... or updateDocument, deleteDocuments, etc:
- *   long gen = manager.addDocument(...);
- *   
- *   // Returned searcher is guaranteed to reflect the just added document
- *   IndexSearcher searcher = manager.get(gen);
- *   try {
- *     // Use searcher to search...
- *   } finally {
- *     manager.release(searcher);
- *   }
- * 
- * - * - * When you are done be sure to close both the manager and the reopen thread: - *
 
- *   reopenThread.close();       
- *   manager.close();
- * 
- * - * @lucene.experimental - */ - -public class NRTManagerReopenThread extends Thread implements NRTManager.WaitingListener, Closeable { - - private final NRTManager manager; - private final long targetMaxStaleNS; - private final long targetMinStaleNS; - private boolean finish; - private long waitingGen; - - /** - * Create NRTManagerReopenThread, to periodically reopen the NRT searcher. - * - * @param targetMaxStaleSec Maximum time until a new - * reader must be opened; this sets the upper bound - * on how slowly reopens may occur - * - * @param targetMinStaleSec Mininum time until a new - * reader can be opened; this sets the lower bound - * on how quickly reopens may occur, when a caller - * is waiting for a specific indexing change to - * become visible. - */ - - public NRTManagerReopenThread(NRTManager manager, double targetMaxStaleSec, double targetMinStaleSec) { - if (targetMaxStaleSec < targetMinStaleSec) { - throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")"); - } - this.manager = manager; - this.targetMaxStaleNS = (long) (1000000000*targetMaxStaleSec); - this.targetMinStaleNS = (long) (1000000000*targetMinStaleSec); - manager.addWaitingListener(this); - } - - @Override - public synchronized void close() { - //System.out.println("NRT: set finish"); - manager.removeWaitingListener(this); - this.finish = true; - notify(); - try { - join(); - } catch (InterruptedException ie) { - throw new ThreadInterruptedException(ie); - } - } - - @Override - public synchronized void waiting(long targetGen) { - waitingGen = Math.max(waitingGen, targetGen); - notify(); - //System.out.println(Thread.currentThread().getName() + ": force wakeup waitingGen=" + waitingGen + " applyDeletes=" + applyDeletes); - } - - @Override - public void run() { - // TODO: maybe use private thread ticktock timer, in - // case clock shift messes up nanoTime? - long lastReopenStartNS = System.nanoTime(); - - //System.out.println("reopen: start"); - try { - while (true) { - - boolean hasWaiting = false; - - synchronized(this) { - // TODO: try to guestimate how long reopen might - // take based on past data? - - while (!finish) { - //System.out.println("reopen: cycle"); - - // True if we have someone waiting for reopen'd searcher: - hasWaiting = waitingGen > manager.getCurrentSearchingGen(); - final long nextReopenStartNS = lastReopenStartNS + (hasWaiting ? targetMinStaleNS : targetMaxStaleNS); - - final long sleepNS = nextReopenStartNS - System.nanoTime(); - - if (sleepNS > 0) { - //System.out.println("reopen: sleep " + (sleepNS/1000000.0) + " ms (hasWaiting=" + hasWaiting + ")"); - try { - wait(sleepNS/1000000, (int) (sleepNS%1000000)); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - //System.out.println("NRT: set finish on interrupt"); - finish = true; - break; - } - } else { - break; - } - } - - if (finish) { - //System.out.println("reopen: finish"); - return; - } - //System.out.println("reopen: start hasWaiting=" + hasWaiting); - } - - lastReopenStartNS = System.nanoTime(); - try { - //final long t0 = System.nanoTime(); - manager.maybeRefresh(); - //System.out.println("reopen took " + ((System.nanoTime()-t0)/1000000.0) + " msec"); - } catch (IOException ioe) { - //System.out.println(Thread.currentThread().getName() + ": IOE"); - //ioe.printStackTrace(); - throw new RuntimeException(ioe); - } - } - } catch (Throwable t) { - //System.out.println("REOPEN EXC"); - //t.printStackTrace(System.out); - throw new RuntimeException(t); - } - } -} Index: lucene/core/src/java/org/apache/lucene/search/SearcherLifetimeManager.java =================================================================== --- lucene/core/src/java/org/apache/lucene/search/SearcherLifetimeManager.java (revision 1476852) +++ lucene/core/src/java/org/apache/lucene/search/SearcherLifetimeManager.java (working copy) @@ -24,7 +24,6 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; -import org.apache.lucene.search.NRTManager; // javadocs import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.IOUtils; @@ -41,8 +40,8 @@ * * Per search-request, if it's a "new" search request, then * obtain the latest searcher you have (for example, by - * using {@link SearcherManager} or {@link NRTManager}), and - * then record this searcher: + * using {@link SearcherManager}), and then record this + * searcher: * *
  *   // Record the current searcher, and save the returend
@@ -143,8 +142,7 @@
 
   /** Records that you are now using this IndexSearcher.
    *  Always call this when you've obtained a possibly new
-   *  {@link IndexSearcher}, for example from one of the
-   *  get methods in {@link NRTManager} or {@link
+   *  {@link IndexSearcher}, for example from {@link
    *  SearcherManager}.  It's fine if you already passed the
    *  same searcher to this method before.
    *
Index: lucene/core/src/java/org/apache/lucene/search/NRTManager.java
===================================================================
--- lucene/core/src/java/org/apache/lucene/search/NRTManager.java	(revision 1476852)
+++ lucene/core/src/java/org/apache/lucene/search/NRTManager.java	(working copy)
@@ -1,404 +0,0 @@
-package org.apache.lucene.search;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexDocument;
-import org.apache.lucene.index.IndexReader; // javadocs
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.IndexSearcher; // javadocs
-import org.apache.lucene.search.SearcherFactory; // javadocs
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.util.ThreadInterruptedException;
-
-/**
- * Utility class to manage sharing near-real-time searchers
- * across multiple searching thread.  The difference vs
- * SearcherManager is that this class enables individual
- * requests to wait until specific indexing changes are
- * visible.
- *
- * 

You must create an IndexWriter, then create a {@link - * NRTManager.TrackingIndexWriter} from it, and pass that to the - * NRTManager. You may want to create two NRTManagers, once - * that always applies deletes on refresh and one that does - * not. In this case you should use a single {@link - * NRTManager.TrackingIndexWriter} instance for both. - * - *

Then, use {@link #acquire} to obtain the - * {@link IndexSearcher}, and {@link #release} (ideally, - * from within a finally clause) to release it. - * - *

NOTE: to use this class, you must call {@link #maybeRefresh()} - * periodically. The {@link NRTManagerReopenThread} is a - * simple class to do this on a periodic basis, and reopens - * more quickly if a request is waiting. If you implement - * your own reopener, be sure to call {@link - * #addWaitingListener} so your reopener is notified when a - * caller is waiting for a specific generation - * searcher.

- * - * @see SearcherFactory - * - * @lucene.experimental - */ - -public final class NRTManager extends ReferenceManager { - private static final long MAX_SEARCHER_GEN = Long.MAX_VALUE; - private final TrackingIndexWriter writer; - private final List waitingListeners = new CopyOnWriteArrayList(); - private final ReentrantLock genLock = new ReentrantLock();; - private final Condition newGeneration = genLock.newCondition(); - private final SearcherFactory searcherFactory; - - private volatile long searchingGen; - - /** - * Create new NRTManager. - * - * @param writer TrackingIndexWriter to open near-real-time - * readers - * @param searcherFactory An optional {@link SearcherFactory}. Pass - * null if you don't require the searcher to be warmed - * before going live or other custom behavior. - */ - public NRTManager(TrackingIndexWriter writer, SearcherFactory searcherFactory) throws IOException { - this(writer, searcherFactory, true); - } - - /** - * Expert: just like {@link - * #NRTManager(TrackingIndexWriter,SearcherFactory)}, - * but you can also specify whether each reopened searcher must - * apply deletes. This is useful for cases where certain - * uses can tolerate seeing some deleted docs, since - * reopen time is faster if deletes need not be applied. */ - public NRTManager(TrackingIndexWriter writer, SearcherFactory searcherFactory, boolean applyAllDeletes) throws IOException { - this.writer = writer; - if (searcherFactory == null) { - searcherFactory = new SearcherFactory(); - } - this.searcherFactory = searcherFactory; - current = SearcherManager.getSearcher(searcherFactory, DirectoryReader.open(writer.getIndexWriter(), applyAllDeletes)); - } - - @Override - protected void decRef(IndexSearcher reference) throws IOException { - reference.getIndexReader().decRef(); - } - - @Override - protected boolean tryIncRef(IndexSearcher reference) { - return reference.getIndexReader().tryIncRef(); - } - - /** NRTManager invokes this interface to notify it when a - * caller is waiting for a specific generation searcher - * to be visible. */ - public static interface WaitingListener { - public void waiting(long targetGen); - } - - /** Adds a listener, to be notified when a caller is - * waiting for a specific generation searcher to be - * visible. */ - public void addWaitingListener(WaitingListener l) { - waitingListeners.add(l); - } - - /** Remove a listener added with {@link - * #addWaitingListener}. */ - public void removeWaitingListener(WaitingListener l) { - waitingListeners.remove(l); - } - - /** Class that tracks changes to a delegated - * IndexWriter. Create this class (passing your - * IndexWriter), and then pass this class to NRTManager. - * Be sure to make all changes via the - * TrackingIndexWriter, otherwise NRTManager won't know - * about the changes. - * - * @lucene.experimental */ - public static class TrackingIndexWriter { - private final IndexWriter writer; - private final AtomicLong indexingGen = new AtomicLong(1); - - public TrackingIndexWriter(IndexWriter writer) { - this.writer = writer; - } - - public long updateDocument(Term t, IndexDocument d, Analyzer a) throws IOException { - writer.updateDocument(t, d, a); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long updateDocument(Term t, IndexDocument d) throws IOException { - writer.updateDocument(t, d); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long updateDocuments(Term t, Iterable docs, Analyzer a) throws IOException { - writer.updateDocuments(t, docs, a); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long updateDocuments(Term t, Iterable docs) throws IOException { - writer.updateDocuments(t, docs); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long deleteDocuments(Term t) throws IOException { - writer.deleteDocuments(t); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long deleteDocuments(Term... terms) throws IOException { - writer.deleteDocuments(terms); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long deleteDocuments(Query q) throws IOException { - writer.deleteDocuments(q); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long deleteDocuments(Query... queries) throws IOException { - writer.deleteDocuments(queries); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long deleteAll() throws IOException { - writer.deleteAll(); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long addDocument(IndexDocument d, Analyzer a) throws IOException { - writer.addDocument(d, a); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long addDocuments(Iterable docs, Analyzer a) throws IOException { - writer.addDocuments(docs, a); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long addDocument(IndexDocument d) throws IOException { - writer.addDocument(d); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long addDocuments(Iterable docs) throws IOException { - writer.addDocuments(docs); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long addIndexes(Directory... dirs) throws IOException { - writer.addIndexes(dirs); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long addIndexes(IndexReader... readers) throws IOException { - writer.addIndexes(readers); - // Return gen as of when indexing finished: - return indexingGen.get(); - } - - public long getGeneration() { - return indexingGen.get(); - } - - public IndexWriter getIndexWriter() { - return writer; - } - - long getAndIncrementGeneration() { - return indexingGen.getAndIncrement(); - } - - public long tryDeleteDocument(IndexReader reader, int docID) throws IOException { - if (writer.tryDeleteDocument(reader, docID)) { - return indexingGen.get(); - } else { - return -1; - } - } - } - - /** - * Waits for the target generation to become visible in - * the searcher. - * If the current searcher is older than the - * target generation, this method will block - * until the searcher is reopened, by another via - * {@link #maybeRefresh} or until the {@link NRTManager} is closed. - * - * @param targetGen the generation to wait for - */ - public void waitForGeneration(long targetGen) { - waitForGeneration(targetGen, -1, TimeUnit.NANOSECONDS); - } - - /** - * Waits for the target generation to become visible in - * the searcher. If the current searcher is older than - * the target generation, this method will block until the - * searcher has been reopened by another thread via - * {@link #maybeRefresh}, the given waiting time has elapsed, or until - * the NRTManager is closed. - *

- * NOTE: if the waiting time elapses before the requested target generation is - * available the current {@link SearcherManager} is returned instead. - * - * @param targetGen - * the generation to wait for - * @param time - * the time to wait for the target generation - * @param unit - * the waiting time's time unit - */ - public void waitForGeneration(long targetGen, long time, TimeUnit unit) { - try { - final long curGen = writer.getGeneration(); - if (targetGen > curGen) { - throw new IllegalArgumentException("targetGen=" + targetGen + " was never returned by this NRTManager instance (current gen=" + curGen + ")"); - } - genLock.lockInterruptibly(); - try { - if (targetGen > searchingGen) { - for (WaitingListener listener : waitingListeners) { - listener.waiting(targetGen); - } - while (targetGen > searchingGen) { - if (!waitOnGenCondition(time, unit)) { - return; - } - } - } - } finally { - genLock.unlock(); - } - } catch (InterruptedException ie) { - throw new ThreadInterruptedException(ie); - } - } - - private boolean waitOnGenCondition(long time, TimeUnit unit) - throws InterruptedException { - assert genLock.isHeldByCurrentThread(); - if (time < 0) { - newGeneration.await(); - return true; - } else { - return newGeneration.await(time, unit); - } - } - - /** Returns generation of current searcher. */ - public long getCurrentSearchingGen() { - return searchingGen; - } - - private long lastRefreshGen; - - @Override - protected IndexSearcher refreshIfNeeded(IndexSearcher referenceToRefresh) throws IOException { - // Record gen as of when reopen started: - lastRefreshGen = writer.getAndIncrementGeneration(); - final IndexReader r = referenceToRefresh.getIndexReader(); - assert r instanceof DirectoryReader: "searcher's IndexReader should be a DirectoryReader, but got " + r; - final DirectoryReader dirReader = (DirectoryReader) r; - IndexSearcher newSearcher = null; - if (!dirReader.isCurrent()) { - final IndexReader newReader = DirectoryReader.openIfChanged(dirReader); - if (newReader != null) { - newSearcher = SearcherManager.getSearcher(searcherFactory, newReader); - } - } - - return newSearcher; - } - - @Override - protected void afterMaybeRefresh() { - genLock.lock(); - try { - if (searchingGen != MAX_SEARCHER_GEN) { - // update searchingGen: - assert lastRefreshGen >= searchingGen; - searchingGen = lastRefreshGen; - } - // wake up threads if we have a new generation: - newGeneration.signalAll(); - } finally { - genLock.unlock(); - } - } - - @Override - protected synchronized void afterClose() throws IOException { - genLock.lock(); - try { - // max it out to make sure nobody can wait on another gen - searchingGen = MAX_SEARCHER_GEN; - newGeneration.signalAll(); - } finally { - genLock.unlock(); - } - } - - /** - * Returns true if no changes have occured since this searcher - * ie. reader was opened, otherwise false. - * @see DirectoryReader#isCurrent() - */ - public boolean isSearcherCurrent() throws IOException { - final IndexSearcher searcher = acquire(); - try { - final IndexReader r = searcher.getIndexReader(); - assert r instanceof DirectoryReader: "searcher's IndexReader should be a DirectoryReader, but got " + r; - return ((DirectoryReader) r).isCurrent(); - } finally { - release(searcher); - } - } -} Index: lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java =================================================================== --- lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java (revision 0) +++ lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java (working copy) @@ -0,0 +1,235 @@ +package org.apache.lucene.search; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.lucene.index.TrackingIndexWriter; +import org.apache.lucene.util.ThreadInterruptedException; + +/** Utility class to manage reopens of a {@link + * ReferenceManager} with methods to wait for a specific + * generation to become visible. To use this class you + * must first wrap your {@link IndexWriter} with a {@link + * TrackingIndexWriter}. Always use the + * TrackingIndexWriter to make changes to the index, and + * save the returned generation. Then, when a given search + * request needs to see a specific index change, call the + * {#waitForGeneration} to wait for that change to be + * visible. + * + * @lucene.experimental */ + +public class ControlledRealTimeReopenThread extends Thread implements Closeable { + + private final ReferenceManager manager; + private final long targetMaxStaleNS; + private final long targetMinStaleNS; + private final TrackingIndexWriter writer; + private volatile boolean finish; + private volatile long waitingGen; + private volatile long searchingGen; + private long refreshStartGen; + + private final ReentrantLock reopenLock = new ReentrantLock(); + private final Condition reopenCond = reopenLock.newCondition(); + + /** + * Create ControlledRealTimeReopenThread, to periodically + * reopen the NRT searcher. + * + * @param targetMaxStaleSec Maximum time until a new + * reader must be opened; this sets the upper bound + * on how slowly reopens may occur, when no + * caller is waiting for a specific generation to + * become visible. + * + * @param targetMinStaleSec Mininum time until a new + * reader can be opened; this sets the lower bound + * on how quickly reopens may occur, when a caller + * is waiting for a specific generation to + * become visible. + */ + public ControlledRealTimeReopenThread(TrackingIndexWriter writer, ReferenceManager manager, double targetMaxStaleSec, double targetMinStaleSec) { + if (targetMaxStaleSec < targetMinStaleSec) { + throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")"); + } + this.writer = writer; + this.manager = manager; + this.targetMaxStaleNS = (long) (1000000000*targetMaxStaleSec); + this.targetMinStaleNS = (long) (1000000000*targetMinStaleSec); + manager.addListener(new HandleRefresh()); + } + + private class HandleRefresh implements ReferenceManager.RefreshListener { + @Override + public void beforeRefresh() { + } + + @Override + public void afterRefresh(boolean didRefresh) { + refreshDone(didRefresh); + } + } + + private synchronized void refreshDone(boolean didRefresh) { + searchingGen = refreshStartGen; + notifyAll(); + } + + @Override + public synchronized void close() { + //System.out.println("NRT: set finish"); + + finish = true; + + // So thread wakes up and notices it should finish: + reopenLock.lock(); + try { + reopenCond.signal(); + } finally { + reopenLock.unlock(); + } + + try { + join(); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + + // Max it out so any waiting search threads will return: + searchingGen = Long.MAX_VALUE; + notifyAll(); + } + + /** + * Waits for the target generation to become visible in + * the searcher. + * If the current searcher is older than the + * target generation, this method will block + * until the searcher is reopened, by another via + * {@link #maybeRefresh} or until the {@link NRTManager} is closed. + * + * @param targetGen the generation to wait for + */ + public void waitForGeneration(long targetGen) throws InterruptedException { + waitForGeneration(targetGen, -1); + } + + /** + * Waits for the target generation to become visible in + * the searcher. If the current searcher is older than + * the target generation, this method will block until the + * searcher has been reopened by another thread via + * {@link #maybeRefresh}, the given waiting time has elapsed, or until + * the NRTManager is closed. + *

+ * NOTE: if the waiting time elapses before the requested target generation is + * available the current {@link SearcherManager} is returned instead. + * + * @param targetGen + * the generation to wait for + * @param maxMS + * maximum milliseconds to wait, or -1 to wait indefinitely + * @return true if the targetGeneration is now available, + * or false if maxMS wait time was exceeded + */ + public synchronized boolean waitForGeneration(long targetGen, int maxMS) throws InterruptedException { + final long curGen = writer.getGeneration(); + if (targetGen > curGen) { + throw new IllegalArgumentException("targetGen=" + targetGen + " was never returned by this NRTManager instance (current gen=" + curGen + ")"); + } + if (targetGen > searchingGen) { + waitingGen = Math.max(waitingGen, targetGen); + + // Notify the reopen thread that the waitingGen has + // changed, so it may wake up and realize it should + // not sleep for much or any longer before reopening: + reopenLock.lock(); + try { + reopenCond.signal(); + } finally { + reopenLock.unlock(); + } + + long startMS = System.nanoTime()/1000000; + + while (targetGen > searchingGen) { + if (maxMS < 0) { + wait(); + } else { + long msLeft = ((startMS + maxMS) - (System.nanoTime())/1000000); + if (msLeft <= 0) { + return false; + } else { + wait(msLeft); + } + } + } + } + + return true; + } + + @Override + public void run() { + // TODO: maybe use private thread ticktock timer, in + // case clock shift messes up nanoTime? + long lastReopenStartNS = System.nanoTime(); + + //System.out.println("reopen: start"); + while (!finish) { + + // TODO: try to guestimate how long reopen might + // take based on past data? + + // True if we have someone waiting for reopenedd searcher: + boolean hasWaiting = waitingGen > searchingGen; + final long nextReopenStartNS = lastReopenStartNS + (hasWaiting ? targetMinStaleNS : targetMaxStaleNS); + + final long sleepNS = nextReopenStartNS - System.nanoTime(); + + if (sleepNS > 0) { + reopenLock.lock(); + try { + reopenCond.awaitNanos(sleepNS); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } finally { + reopenLock.unlock(); + } + } + + if (finish) { + break; + } + + lastReopenStartNS = System.nanoTime(); + refreshStartGen = writer.getAndIncrementGeneration(); + try { + manager.maybeRefreshBlocking(); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + } +} Property changes on: lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java (revision 0) +++ lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java (working copy) @@ -0,0 +1,157 @@ +package org.apache.lucene.index; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.search.Query; +import org.apache.lucene.store.Directory; + +/** Class that tracks changes to a delegated + * IndexWriter, used by {@link + * ControlledRealTimeReopenThread} to ensure specific + * changes are visible. Create this class (passing your + * IndexWriter), and then pass this class to {@link + * ControlledRealTimeReopenThread}. + * Be sure to make all changes via the + * TrackingIndexWriter, otherwise {@link + * ControlledRealTimeReopenThread} won't know about the changes. + * + * @lucene.experimental */ + +public class TrackingIndexWriter { + private final IndexWriter writer; + private final AtomicLong indexingGen = new AtomicLong(1); + + public TrackingIndexWriter(IndexWriter writer) { + this.writer = writer; + } + + public long updateDocument(Term t, IndexDocument d, Analyzer a) throws IOException { + writer.updateDocument(t, d, a); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long updateDocument(Term t, IndexDocument d) throws IOException { + writer.updateDocument(t, d); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long updateDocuments(Term t, Iterable docs, Analyzer a) throws IOException { + writer.updateDocuments(t, docs, a); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long updateDocuments(Term t, Iterable docs) throws IOException { + writer.updateDocuments(t, docs); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long deleteDocuments(Term t) throws IOException { + writer.deleteDocuments(t); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long deleteDocuments(Term... terms) throws IOException { + writer.deleteDocuments(terms); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long deleteDocuments(Query q) throws IOException { + writer.deleteDocuments(q); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long deleteDocuments(Query... queries) throws IOException { + writer.deleteDocuments(queries); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long deleteAll() throws IOException { + writer.deleteAll(); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long addDocument(IndexDocument d, Analyzer a) throws IOException { + writer.addDocument(d, a); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long addDocuments(Iterable docs, Analyzer a) throws IOException { + writer.addDocuments(docs, a); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long addDocument(IndexDocument d) throws IOException { + writer.addDocument(d); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long addDocuments(Iterable docs) throws IOException { + writer.addDocuments(docs); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long addIndexes(Directory... dirs) throws IOException { + writer.addIndexes(dirs); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long addIndexes(IndexReader... readers) throws IOException { + writer.addIndexes(readers); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long getGeneration() { + return indexingGen.get(); + } + + public IndexWriter getIndexWriter() { + return writer; + } + + public long getAndIncrementGeneration() { + return indexingGen.getAndIncrement(); + } + + public long tryDeleteDocument(IndexReader reader, int docID) throws IOException { + if (writer.tryDeleteDocument(reader, docID)) { + return indexingGen.get(); + } else { + return -1; + } + } +} + Property changes on: lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: lucene/core/src/test/org/apache/lucene/search/TestNRTManager.java =================================================================== --- lucene/core/src/test/org/apache/lucene/search/TestNRTManager.java (revision 1476852) +++ lucene/core/src/test/org/apache/lucene/search/TestNRTManager.java (working copy) @@ -1,445 +0,0 @@ -package org.apache.lucene.search; - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.analysis.MockAnalyzer; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexDocument; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.NoMergePolicy; -import org.apache.lucene.index.RandomIndexWriter; -import org.apache.lucene.index.Term; -import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.NRTCachingDirectory; -import org.apache.lucene.util.IOUtils; -import org.apache.lucene.util.LuceneTestCase; -import org.apache.lucene.util.LuceneTestCase.SuppressCodecs; -import org.apache.lucene.util.ThreadInterruptedException; - -@SuppressCodecs({ "SimpleText", "Memory", "Direct" }) -public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase { - - private final ThreadLocal lastGens = new ThreadLocal(); - private boolean warmCalled; - - public void testNRTManager() throws Exception { - runTest("TestNRTManager"); - } - - @Override - protected IndexSearcher getFinalSearcher() throws Exception { - if (VERBOSE) { - System.out.println("TEST: finalSearcher maxGen=" + maxGen); - } - nrtDeletes.waitForGeneration(maxGen); - return nrtDeletes.acquire(); - } - - @Override - protected Directory getDirectory(Directory in) { - // Randomly swap in NRTCachingDir - if (random().nextBoolean()) { - if (VERBOSE) { - System.out.println("TEST: wrap NRTCachingDir"); - } - - return new NRTCachingDirectory(in, 5.0, 60.0); - } else { - return in; - } - } - - @Override - protected void updateDocuments(Term id, List docs) throws Exception { - final long gen = genWriter.updateDocuments(id, docs); - - // Randomly verify the update "took": - if (random().nextInt(20) == 2) { - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); - } - nrtDeletes.waitForGeneration(gen); - final IndexSearcher s = nrtDeletes.acquire(); - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); - } - try { - assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits); - } finally { - nrtDeletes.release(s); - } - } - - lastGens.set(gen); - } - - @Override - protected void addDocuments(Term id, List docs) throws Exception { - final long gen = genWriter.addDocuments(docs); - // Randomly verify the add "took": - if (random().nextInt(20) == 2) { - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); - } - nrtNoDeletes.waitForGeneration(gen); - final IndexSearcher s = nrtNoDeletes.acquire(); - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); - } - try { - assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits); - } finally { - nrtNoDeletes.release(s); - } - } - lastGens.set(gen); - } - - @Override - protected void addDocument(Term id, IndexDocument doc) throws Exception { - final long gen = genWriter.addDocument(doc); - - // Randomly verify the add "took": - if (random().nextInt(20) == 2) { - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); - } - nrtNoDeletes.waitForGeneration(gen); - final IndexSearcher s = nrtNoDeletes.acquire(); - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); - } - try { - assertEquals(1, s.search(new TermQuery(id), 10).totalHits); - } finally { - nrtNoDeletes.release(s); - } - } - lastGens.set(gen); - } - - @Override - protected void updateDocument(Term id, IndexDocument doc) throws Exception { - final long gen = genWriter.updateDocument(id, doc); - // Randomly verify the udpate "took": - if (random().nextInt(20) == 2) { - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); - } - nrtDeletes.waitForGeneration(gen); - final IndexSearcher s = nrtDeletes.acquire(); - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); - } - try { - assertEquals(1, s.search(new TermQuery(id), 10).totalHits); - } finally { - nrtDeletes.release(s); - } - } - lastGens.set(gen); - } - - @Override - protected void deleteDocuments(Term id) throws Exception { - final long gen = genWriter.deleteDocuments(id); - // randomly verify the delete "took": - if (random().nextInt(20) == 7) { - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: verify del " + id); - } - nrtDeletes.waitForGeneration(gen); - final IndexSearcher s = nrtDeletes.acquire(); - if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); - } - try { - assertEquals(0, s.search(new TermQuery(id), 10).totalHits); - } finally { - nrtDeletes.release(s); - } - } - lastGens.set(gen); - } - - // Not guaranteed to reflect deletes: - private NRTManager nrtNoDeletes; - - // Is guaranteed to reflect deletes: - private NRTManager nrtDeletes; - - private NRTManager.TrackingIndexWriter genWriter; - - private NRTManagerReopenThread nrtDeletesThread; - private NRTManagerReopenThread nrtNoDeletesThread; - - @Override - protected void doAfterWriter(final ExecutorService es) throws Exception { - final double minReopenSec = 0.01 + 0.05 * random().nextDouble(); - final double maxReopenSec = minReopenSec * (1.0 + 10 * random().nextDouble()); - - if (VERBOSE) { - System.out.println("TEST: make NRTManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec); - } - - genWriter = new NRTManager.TrackingIndexWriter(writer); - - final SearcherFactory sf = new SearcherFactory() { - @Override - public IndexSearcher newSearcher(IndexReader r) throws IOException { - TestNRTManager.this.warmCalled = true; - IndexSearcher s = new IndexSearcher(r, es); - s.search(new TermQuery(new Term("body", "united")), 10); - return s; - } - }; - - nrtNoDeletes = new NRTManager(genWriter, sf, false); - nrtDeletes = new NRTManager(genWriter, sf, true); - - nrtDeletesThread = new NRTManagerReopenThread(nrtDeletes, maxReopenSec, minReopenSec); - nrtDeletesThread.setName("NRTDeletes Reopen Thread"); - nrtDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY)); - nrtDeletesThread.setDaemon(true); - nrtDeletesThread.start(); - - nrtNoDeletesThread = new NRTManagerReopenThread(nrtNoDeletes, maxReopenSec, minReopenSec); - nrtNoDeletesThread.setName("NRTNoDeletes Reopen Thread"); - nrtNoDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY)); - nrtNoDeletesThread.setDaemon(true); - nrtNoDeletesThread.start(); - } - - @Override - protected void doAfterIndexingThreadDone() { - Long gen = lastGens.get(); - if (gen != null) { - addMaxGen(gen); - } - } - - private long maxGen = -1; - - private synchronized void addMaxGen(long gen) { - maxGen = Math.max(gen, maxGen); - } - - @Override - protected void doSearching(ExecutorService es, long stopTime) throws Exception { - runSearchThreads(stopTime); - } - - @Override - protected IndexSearcher getCurrentSearcher() throws Exception { - // Test doesn't assert deletions until the end, so we - // can randomize whether dels must be applied - final NRTManager nrt; - if (random().nextBoolean()) { - nrt = nrtDeletes; - } else { - nrt = nrtNoDeletes; - } - - return nrt.acquire(); - } - - @Override - protected void releaseSearcher(IndexSearcher s) throws Exception { - // NOTE: a bit iffy... technically you should release - // against the same NRT mgr you acquired from... but - // both impls just decRef the underlying reader so we - // can get away w/ cheating: - nrtNoDeletes.release(s); - } - - @Override - protected void doClose() throws Exception { - assertTrue(warmCalled); - if (VERBOSE) { - System.out.println("TEST: now close NRTManager"); - } - nrtDeletesThread.close(); - nrtDeletes.close(); - nrtNoDeletesThread.close(); - nrtNoDeletes.close(); - } - - /* - * LUCENE-3528 - NRTManager hangs in certain situations - */ - public void testThreadStarvationNoDeleteNRTReader() throws IOException, InterruptedException { - IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); - conf.setMergePolicy(random().nextBoolean() ? NoMergePolicy.COMPOUND_FILES : NoMergePolicy.NO_COMPOUND_FILES); - Directory d = newDirectory(); - final CountDownLatch latch = new CountDownLatch(1); - final CountDownLatch signal = new CountDownLatch(1); - - LatchedIndexWriter _writer = new LatchedIndexWriter(d, conf, latch, signal); - final NRTManager.TrackingIndexWriter writer = new NRTManager.TrackingIndexWriter(_writer); - final NRTManager manager = new NRTManager(writer, null, false); - Document doc = new Document(); - doc.add(newTextField("test", "test", Field.Store.YES)); - long gen = writer.addDocument(doc); - manager.maybeRefresh(); - assertFalse(gen < manager.getCurrentSearchingGen()); - Thread t = new Thread() { - @Override - public void run() { - try { - signal.await(); - manager.maybeRefresh(); - writer.deleteDocuments(new TermQuery(new Term("foo", "barista"))); - manager.maybeRefresh(); // kick off another reopen so we inc. the internal gen - } catch (Exception e) { - e.printStackTrace(); - } finally { - latch.countDown(); // let the add below finish - } - } - }; - t.start(); - _writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through - final long lastGen = writer.updateDocument(new Term("foo", "bar"), doc); // once this returns the doc is already reflected in the last reopen - - assertFalse(manager.isSearcherCurrent()); // false since there is a delete in the queue - - IndexSearcher searcher = manager.acquire(); - try { - assertEquals(2, searcher.getIndexReader().numDocs()); - } finally { - manager.release(searcher); - } - NRTManagerReopenThread thread = new NRTManagerReopenThread(manager, 0.01, 0.01); - thread.start(); // start reopening - if (VERBOSE) { - System.out.println("waiting now for generation " + lastGen); - } - - final AtomicBoolean finished = new AtomicBoolean(false); - Thread waiter = new Thread() { - @Override - public void run() { - manager.waitForGeneration(lastGen); - finished.set(true); - } - }; - waiter.start(); - manager.maybeRefresh(); - waiter.join(1000); - if (!finished.get()) { - waiter.interrupt(); - fail("thread deadlocked on waitForGeneration"); - } - thread.close(); - thread.join(); - IOUtils.close(manager, _writer, d); - } - - public static class LatchedIndexWriter extends IndexWriter { - - private CountDownLatch latch; - boolean waitAfterUpdate = false; - private CountDownLatch signal; - - public LatchedIndexWriter(Directory d, IndexWriterConfig conf, - CountDownLatch latch, CountDownLatch signal) - throws IOException { - super(d, conf); - this.latch = latch; - this.signal = signal; - - } - - @Override - public void updateDocument(Term term, - IndexDocument doc, Analyzer analyzer) - throws IOException { - super.updateDocument(term, doc, analyzer); - try { - if (waitAfterUpdate) { - signal.countDown(); - latch.await(); - } - } catch (InterruptedException e) { - throw new ThreadInterruptedException(e); - } - } - } - - public void testEvilSearcherFactory() throws Exception { - final Directory dir = newDirectory(); - final RandomIndexWriter w = new RandomIndexWriter(random(), dir); - w.commit(); - - final IndexReader other = DirectoryReader.open(dir); - - final SearcherFactory theEvilOne = new SearcherFactory() { - @Override - public IndexSearcher newSearcher(IndexReader ignored) { - return LuceneTestCase.newSearcher(other); - } - }; - - try { - new NRTManager(new NRTManager.TrackingIndexWriter(w.w), theEvilOne); - } catch (IllegalStateException ise) { - // expected - } - w.close(); - other.close(); - dir.close(); - } - - public void testListenerCalled() throws Exception { - Directory dir = newDirectory(); - IndexWriter iw = new IndexWriter(dir, new IndexWriterConfig(TEST_VERSION_CURRENT, null)); - final AtomicBoolean afterRefreshCalled = new AtomicBoolean(false); - NRTManager sm = new NRTManager(new NRTManager.TrackingIndexWriter(iw),new SearcherFactory()); - sm.addListener(new ReferenceManager.RefreshListener() { - @Override - public void beforeRefresh() { - } - @Override - public void afterRefresh(boolean didRefresh) { - if (didRefresh) { - afterRefreshCalled.set(true); - } - } - }); - iw.addDocument(new Document()); - iw.commit(); - assertFalse(afterRefreshCalled.get()); - sm.maybeRefreshBlocking(); - assertTrue(afterRefreshCalled.get()); - sm.close(); - iw.close(); - dir.close(); - } -} Index: lucene/core/src/test/org/apache/lucene/search/TestLiveFieldValues.java =================================================================== --- lucene/core/src/test/org/apache/lucene/search/TestLiveFieldValues.java (revision 1476852) +++ lucene/core/src/test/org/apache/lucene/search/TestLiveFieldValues.java (working copy) @@ -25,7 +25,6 @@ import java.util.Locale; import java.util.Map; import java.util.Random; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import org.apache.lucene.analysis.MockAnalyzer; @@ -36,10 +35,8 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.RandomIndexWriter; import org.apache.lucene.index.StoredDocument; import org.apache.lucene.index.Term; -import org.apache.lucene.search.NRTManager.TrackingIndexWriter; import org.apache.lucene.store.Directory; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util._TestUtil; @@ -50,10 +47,9 @@ Directory dir = newFSDirectory(_TestUtil.getTempDir("livefieldupdates")); IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); - final IndexWriter _w = new IndexWriter(dir, iwc); - final TrackingIndexWriter w = new TrackingIndexWriter(_w); + final IndexWriter w = new IndexWriter(dir, iwc); - final NRTManager mgr = new NRTManager(w, new SearcherFactory() { + final SearcherManager mgr = new SearcherManager(w, true, new SearcherFactory() { @Override public IndexSearcher newSearcher(IndexReader r) { return new IndexSearcher(r); @@ -174,7 +170,7 @@ rt.close(); mgr.close(); - _w.close(); + w.close(); dir.close(); } } Index: lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java =================================================================== --- lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java (working copy) +++ lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java (working copy) @@ -36,21 +36,33 @@ import org.apache.lucene.index.RandomIndexWriter; import org.apache.lucene.index.Term; import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase; +import org.apache.lucene.index.TrackingIndexWriter; import org.apache.lucene.store.Directory; import org.apache.lucene.store.NRTCachingDirectory; import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.LuceneTestCase.SuppressCodecs; import org.apache.lucene.util.LuceneTestCase; -import org.apache.lucene.util.LuceneTestCase.SuppressCodecs; import org.apache.lucene.util.ThreadInterruptedException; @SuppressCodecs({ "SimpleText", "Memory", "Direct" }) -public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase { +public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearchingTestCase { + // Not guaranteed to reflect deletes: + private SearcherManager nrtNoDeletes; + + // Is guaranteed to reflect deletes: + private SearcherManager nrtDeletes; + + private TrackingIndexWriter genWriter; + + private ControlledRealTimeReopenThread nrtDeletesThread; + private ControlledRealTimeReopenThread nrtNoDeletesThread; + private final ThreadLocal lastGens = new ThreadLocal(); private boolean warmCalled; - public void testNRTManager() throws Exception { - runTest("TestNRTManager"); + public void testControlledRealTimeReopenThread() throws Exception { + runTest("TestControlledRealTimeReopenThread"); } @Override @@ -58,7 +70,7 @@ if (VERBOSE) { System.out.println("TEST: finalSearcher maxGen=" + maxGen); } - nrtDeletes.waitForGeneration(maxGen); + nrtDeletesThread.waitForGeneration(maxGen); return nrtDeletes.acquire(); } @@ -85,7 +97,7 @@ if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); } - nrtDeletes.waitForGeneration(gen); + nrtDeletesThread.waitForGeneration(gen); final IndexSearcher s = nrtDeletes.acquire(); if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); @@ -108,7 +120,7 @@ if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); } - nrtNoDeletes.waitForGeneration(gen); + nrtNoDeletesThread.waitForGeneration(gen); final IndexSearcher s = nrtNoDeletes.acquire(); if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); @@ -131,7 +143,7 @@ if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); } - nrtNoDeletes.waitForGeneration(gen); + nrtNoDeletesThread.waitForGeneration(gen); final IndexSearcher s = nrtNoDeletes.acquire(); if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); @@ -153,7 +165,7 @@ if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); } - nrtDeletes.waitForGeneration(gen); + nrtDeletesThread.waitForGeneration(gen); final IndexSearcher s = nrtDeletes.acquire(); if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); @@ -175,7 +187,7 @@ if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: verify del " + id); } - nrtDeletes.waitForGeneration(gen); + nrtDeletesThread.waitForGeneration(gen); final IndexSearcher s = nrtDeletes.acquire(); if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); @@ -189,48 +201,37 @@ lastGens.set(gen); } - // Not guaranteed to reflect deletes: - private NRTManager nrtNoDeletes; - - // Is guaranteed to reflect deletes: - private NRTManager nrtDeletes; - - private NRTManager.TrackingIndexWriter genWriter; - - private NRTManagerReopenThread nrtDeletesThread; - private NRTManagerReopenThread nrtNoDeletesThread; - @Override protected void doAfterWriter(final ExecutorService es) throws Exception { final double minReopenSec = 0.01 + 0.05 * random().nextDouble(); final double maxReopenSec = minReopenSec * (1.0 + 10 * random().nextDouble()); if (VERBOSE) { - System.out.println("TEST: make NRTManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec); + System.out.println("TEST: make SearcherManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec); } - genWriter = new NRTManager.TrackingIndexWriter(writer); + genWriter = new TrackingIndexWriter(writer); final SearcherFactory sf = new SearcherFactory() { @Override public IndexSearcher newSearcher(IndexReader r) throws IOException { - TestNRTManager.this.warmCalled = true; + TestControlledRealTimeReopenThread.this.warmCalled = true; IndexSearcher s = new IndexSearcher(r, es); s.search(new TermQuery(new Term("body", "united")), 10); return s; } }; - nrtNoDeletes = new NRTManager(genWriter, sf, false); - nrtDeletes = new NRTManager(genWriter, sf, true); + nrtNoDeletes = new SearcherManager(writer, false, sf); + nrtDeletes = new SearcherManager(writer, true, sf); - nrtDeletesThread = new NRTManagerReopenThread(nrtDeletes, maxReopenSec, minReopenSec); + nrtDeletesThread = new ControlledRealTimeReopenThread(genWriter, nrtDeletes, maxReopenSec, minReopenSec); nrtDeletesThread.setName("NRTDeletes Reopen Thread"); nrtDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY)); nrtDeletesThread.setDaemon(true); nrtDeletesThread.start(); - nrtNoDeletesThread = new NRTManagerReopenThread(nrtNoDeletes, maxReopenSec, minReopenSec); + nrtNoDeletesThread = new ControlledRealTimeReopenThread(genWriter, nrtNoDeletes, maxReopenSec, minReopenSec); nrtNoDeletesThread.setName("NRTNoDeletes Reopen Thread"); nrtNoDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY)); nrtNoDeletesThread.setDaemon(true); @@ -260,7 +261,7 @@ protected IndexSearcher getCurrentSearcher() throws Exception { // Test doesn't assert deletions until the end, so we // can randomize whether dels must be applied - final NRTManager nrt; + final SearcherManager nrt; if (random().nextBoolean()) { nrt = nrtDeletes; } else { @@ -273,7 +274,7 @@ @Override protected void releaseSearcher(IndexSearcher s) throws Exception { // NOTE: a bit iffy... technically you should release - // against the same NRT mgr you acquired from... but + // against the same SearcherManager you acquired from... but // both impls just decRef the underlying reader so we // can get away w/ cheating: nrtNoDeletes.release(s); @@ -283,7 +284,7 @@ protected void doClose() throws Exception { assertTrue(warmCalled); if (VERBOSE) { - System.out.println("TEST: now close NRTManager"); + System.out.println("TEST: now close SearcherManagers"); } nrtDeletesThread.close(); nrtDeletes.close(); @@ -302,13 +303,12 @@ final CountDownLatch signal = new CountDownLatch(1); LatchedIndexWriter _writer = new LatchedIndexWriter(d, conf, latch, signal); - final NRTManager.TrackingIndexWriter writer = new NRTManager.TrackingIndexWriter(_writer); - final NRTManager manager = new NRTManager(writer, null, false); + final TrackingIndexWriter writer = new TrackingIndexWriter(_writer); + final SearcherManager manager = new SearcherManager(_writer, false, null); Document doc = new Document(); doc.add(newTextField("test", "test", Field.Store.YES)); long gen = writer.addDocument(doc); manager.maybeRefresh(); - assertFalse(gen < manager.getCurrentSearchingGen()); Thread t = new Thread() { @Override public void run() { @@ -336,7 +336,7 @@ } finally { manager.release(searcher); } - NRTManagerReopenThread thread = new NRTManagerReopenThread(manager, 0.01, 0.01); + final ControlledRealTimeReopenThread thread = new ControlledRealTimeReopenThread(writer, manager, 0.01, 0.01); thread.start(); // start reopening if (VERBOSE) { System.out.println("waiting now for generation " + lastGen); @@ -346,7 +346,12 @@ Thread waiter = new Thread() { @Override public void run() { - manager.waitForGeneration(lastGen); + try { + thread.waitForGeneration(lastGen); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); + } finished.set(true); } }; @@ -408,7 +413,7 @@ }; try { - new NRTManager(new NRTManager.TrackingIndexWriter(w.w), theEvilOne); + new SearcherManager(w.w, false, theEvilOne); } catch (IllegalStateException ise) { // expected } @@ -421,7 +426,7 @@ Directory dir = newDirectory(); IndexWriter iw = new IndexWriter(dir, new IndexWriterConfig(TEST_VERSION_CURRENT, null)); final AtomicBoolean afterRefreshCalled = new AtomicBoolean(false); - NRTManager sm = new NRTManager(new NRTManager.TrackingIndexWriter(iw),new SearcherFactory()); + SearcherManager sm = new SearcherManager(iw, true, new SearcherFactory()); sm.addListener(new ReferenceManager.RefreshListener() { @Override public void beforeRefresh() {