Index: lucene/core/src/java/org/apache/lucene/search/NRTManagerReopenThread.java =================================================================== --- lucene/core/src/java/org/apache/lucene/search/NRTManagerReopenThread.java (revision 1476852) +++ lucene/core/src/java/org/apache/lucene/search/NRTManagerReopenThread.java (working copy) @@ -1,200 +0,0 @@ -package org.apache.lucene.search; - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.Closeable; -import java.io.IOException; - -import org.apache.lucene.util.ThreadInterruptedException; - -/** - * Utility class that runs a reopen thread to periodically - * reopen the NRT searchers in the provided {@link - * NRTManager}. - * - *
Typical usage looks like this: - * - *
- * ... open your own writer ...
- *
- * NRTManager manager = new NRTManager(writer);
- *
- * // Refreshes searcher every 5 seconds when nobody is waiting, and up to 100 msec delay
- * // when somebody is waiting:
- * NRTManagerReopenThread reopenThread = new NRTManagerReopenThread(manager, 5.0, 0.1);
- * reopenThread.setName("NRT Reopen Thread");
- * reopenThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
- * reopenThread.setDaemon(true);
- * reopenThread.start();
- *
- *
- * Then, for each incoming query, do this:
- *
- *
- * // For each incoming query:
- * IndexSearcher searcher = manager.get();
- * try {
- * // Use searcher to search...
- * } finally {
- * manager.release(searcher);
- * }
- *
- *
- * You should make changes using the NRTManager; if you later need to obtain
- * a searcher reflecting those changes:
- *
- *
- * // ... or updateDocument, deleteDocuments, etc:
- * long gen = manager.addDocument(...);
- *
- * // Returned searcher is guaranteed to reflect the just added document
- * IndexSearcher searcher = manager.get(gen);
- * try {
- * // Use searcher to search...
- * } finally {
- * manager.release(searcher);
- * }
- *
- *
- *
- * When you are done be sure to close both the manager and the reopen thread:
- * - * reopenThread.close(); - * manager.close(); - *- * - * @lucene.experimental - */ - -public class NRTManagerReopenThread extends Thread implements NRTManager.WaitingListener, Closeable { - - private final NRTManager manager; - private final long targetMaxStaleNS; - private final long targetMinStaleNS; - private boolean finish; - private long waitingGen; - - /** - * Create NRTManagerReopenThread, to periodically reopen the NRT searcher. - * - * @param targetMaxStaleSec Maximum time until a new - * reader must be opened; this sets the upper bound - * on how slowly reopens may occur - * - * @param targetMinStaleSec Mininum time until a new - * reader can be opened; this sets the lower bound - * on how quickly reopens may occur, when a caller - * is waiting for a specific indexing change to - * become visible. - */ - - public NRTManagerReopenThread(NRTManager manager, double targetMaxStaleSec, double targetMinStaleSec) { - if (targetMaxStaleSec < targetMinStaleSec) { - throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")"); - } - this.manager = manager; - this.targetMaxStaleNS = (long) (1000000000*targetMaxStaleSec); - this.targetMinStaleNS = (long) (1000000000*targetMinStaleSec); - manager.addWaitingListener(this); - } - - @Override - public synchronized void close() { - //System.out.println("NRT: set finish"); - manager.removeWaitingListener(this); - this.finish = true; - notify(); - try { - join(); - } catch (InterruptedException ie) { - throw new ThreadInterruptedException(ie); - } - } - - @Override - public synchronized void waiting(long targetGen) { - waitingGen = Math.max(waitingGen, targetGen); - notify(); - //System.out.println(Thread.currentThread().getName() + ": force wakeup waitingGen=" + waitingGen + " applyDeletes=" + applyDeletes); - } - - @Override - public void run() { - // TODO: maybe use private thread ticktock timer, in - // case clock shift messes up nanoTime? - long lastReopenStartNS = System.nanoTime(); - - //System.out.println("reopen: start"); - try { - while (true) { - - boolean hasWaiting = false; - - synchronized(this) { - // TODO: try to guestimate how long reopen might - // take based on past data? - - while (!finish) { - //System.out.println("reopen: cycle"); - - // True if we have someone waiting for reopen'd searcher: - hasWaiting = waitingGen > manager.getCurrentSearchingGen(); - final long nextReopenStartNS = lastReopenStartNS + (hasWaiting ? targetMinStaleNS : targetMaxStaleNS); - - final long sleepNS = nextReopenStartNS - System.nanoTime(); - - if (sleepNS > 0) { - //System.out.println("reopen: sleep " + (sleepNS/1000000.0) + " ms (hasWaiting=" + hasWaiting + ")"); - try { - wait(sleepNS/1000000, (int) (sleepNS%1000000)); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - //System.out.println("NRT: set finish on interrupt"); - finish = true; - break; - } - } else { - break; - } - } - - if (finish) { - //System.out.println("reopen: finish"); - return; - } - //System.out.println("reopen: start hasWaiting=" + hasWaiting); - } - - lastReopenStartNS = System.nanoTime(); - try { - //final long t0 = System.nanoTime(); - manager.maybeRefresh(); - //System.out.println("reopen took " + ((System.nanoTime()-t0)/1000000.0) + " msec"); - } catch (IOException ioe) { - //System.out.println(Thread.currentThread().getName() + ": IOE"); - //ioe.printStackTrace(); - throw new RuntimeException(ioe); - } - } - } catch (Throwable t) { - //System.out.println("REOPEN EXC"); - //t.printStackTrace(System.out); - throw new RuntimeException(t); - } - } -} Index: lucene/core/src/java/org/apache/lucene/search/SearcherLifetimeManager.java =================================================================== --- lucene/core/src/java/org/apache/lucene/search/SearcherLifetimeManager.java (revision 1476852) +++ lucene/core/src/java/org/apache/lucene/search/SearcherLifetimeManager.java (working copy) @@ -24,7 +24,6 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; -import org.apache.lucene.search.NRTManager; // javadocs import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.IOUtils; @@ -41,8 +40,8 @@ * * Per search-request, if it's a "new" search request, then * obtain the latest searcher you have (for example, by - * using {@link SearcherManager} or {@link NRTManager}), and - * then record this searcher: + * using {@link SearcherManager}), and then record this + * searcher: * *
* // Record the current searcher, and save the returend
@@ -143,8 +142,7 @@
/** Records that you are now using this IndexSearcher.
* Always call this when you've obtained a possibly new
- * {@link IndexSearcher}, for example from one of the
- * get methods in {@link NRTManager} or {@link
+ * {@link IndexSearcher}, for example from {@link
* SearcherManager}. It's fine if you already passed the
* same searcher to this method before.
*
Index: lucene/core/src/java/org/apache/lucene/search/NRTManager.java
===================================================================
--- lucene/core/src/java/org/apache/lucene/search/NRTManager.java (revision 1476852)
+++ lucene/core/src/java/org/apache/lucene/search/NRTManager.java (working copy)
@@ -1,404 +0,0 @@
-package org.apache.lucene.search;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexDocument;
-import org.apache.lucene.index.IndexReader; // javadocs
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.IndexSearcher; // javadocs
-import org.apache.lucene.search.SearcherFactory; // javadocs
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.util.ThreadInterruptedException;
-
-/**
- * Utility class to manage sharing near-real-time searchers
- * across multiple searching thread. The difference vs
- * SearcherManager is that this class enables individual
- * requests to wait until specific indexing changes are
- * visible.
- *
- * You must create an IndexWriter, then create a {@link
- * NRTManager.TrackingIndexWriter} from it, and pass that to the
- * NRTManager. You may want to create two NRTManagers, once
- * that always applies deletes on refresh and one that does
- * not. In this case you should use a single {@link
- * NRTManager.TrackingIndexWriter} instance for both.
- *
- *
Then, use {@link #acquire} to obtain the
- * {@link IndexSearcher}, and {@link #release} (ideally,
- * from within a finally clause) to release it.
- *
- *
NOTE: to use this class, you must call {@link #maybeRefresh()}
- * periodically. The {@link NRTManagerReopenThread} is a
- * simple class to do this on a periodic basis, and reopens
- * more quickly if a request is waiting. If you implement
- * your own reopener, be sure to call {@link
- * #addWaitingListener} so your reopener is notified when a
- * caller is waiting for a specific generation
- * searcher.
- *
- * @see SearcherFactory
- *
- * @lucene.experimental
- */
-
-public final class NRTManager extends ReferenceManager {
- private static final long MAX_SEARCHER_GEN = Long.MAX_VALUE;
- private final TrackingIndexWriter writer;
- private final List waitingListeners = new CopyOnWriteArrayList();
- private final ReentrantLock genLock = new ReentrantLock();;
- private final Condition newGeneration = genLock.newCondition();
- private final SearcherFactory searcherFactory;
-
- private volatile long searchingGen;
-
- /**
- * Create new NRTManager.
- *
- * @param writer TrackingIndexWriter to open near-real-time
- * readers
- * @param searcherFactory An optional {@link SearcherFactory}. Pass
- * null if you don't require the searcher to be warmed
- * before going live or other custom behavior.
- */
- public NRTManager(TrackingIndexWriter writer, SearcherFactory searcherFactory) throws IOException {
- this(writer, searcherFactory, true);
- }
-
- /**
- * Expert: just like {@link
- * #NRTManager(TrackingIndexWriter,SearcherFactory)},
- * but you can also specify whether each reopened searcher must
- * apply deletes. This is useful for cases where certain
- * uses can tolerate seeing some deleted docs, since
- * reopen time is faster if deletes need not be applied. */
- public NRTManager(TrackingIndexWriter writer, SearcherFactory searcherFactory, boolean applyAllDeletes) throws IOException {
- this.writer = writer;
- if (searcherFactory == null) {
- searcherFactory = new SearcherFactory();
- }
- this.searcherFactory = searcherFactory;
- current = SearcherManager.getSearcher(searcherFactory, DirectoryReader.open(writer.getIndexWriter(), applyAllDeletes));
- }
-
- @Override
- protected void decRef(IndexSearcher reference) throws IOException {
- reference.getIndexReader().decRef();
- }
-
- @Override
- protected boolean tryIncRef(IndexSearcher reference) {
- return reference.getIndexReader().tryIncRef();
- }
-
- /** NRTManager invokes this interface to notify it when a
- * caller is waiting for a specific generation searcher
- * to be visible. */
- public static interface WaitingListener {
- public void waiting(long targetGen);
- }
-
- /** Adds a listener, to be notified when a caller is
- * waiting for a specific generation searcher to be
- * visible. */
- public void addWaitingListener(WaitingListener l) {
- waitingListeners.add(l);
- }
-
- /** Remove a listener added with {@link
- * #addWaitingListener}. */
- public void removeWaitingListener(WaitingListener l) {
- waitingListeners.remove(l);
- }
-
- /** Class that tracks changes to a delegated
- * IndexWriter. Create this class (passing your
- * IndexWriter), and then pass this class to NRTManager.
- * Be sure to make all changes via the
- * TrackingIndexWriter, otherwise NRTManager won't know
- * about the changes.
- *
- * @lucene.experimental */
- public static class TrackingIndexWriter {
- private final IndexWriter writer;
- private final AtomicLong indexingGen = new AtomicLong(1);
-
- public TrackingIndexWriter(IndexWriter writer) {
- this.writer = writer;
- }
-
- public long updateDocument(Term t, IndexDocument d, Analyzer a) throws IOException {
- writer.updateDocument(t, d, a);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- public long updateDocument(Term t, IndexDocument d) throws IOException {
- writer.updateDocument(t, d);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- public long updateDocuments(Term t, Iterable extends IndexDocument> docs, Analyzer a) throws IOException {
- writer.updateDocuments(t, docs, a);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- public long updateDocuments(Term t, Iterable extends IndexDocument> docs) throws IOException {
- writer.updateDocuments(t, docs);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- public long deleteDocuments(Term t) throws IOException {
- writer.deleteDocuments(t);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- public long deleteDocuments(Term... terms) throws IOException {
- writer.deleteDocuments(terms);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- public long deleteDocuments(Query q) throws IOException {
- writer.deleteDocuments(q);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- public long deleteDocuments(Query... queries) throws IOException {
- writer.deleteDocuments(queries);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- public long deleteAll() throws IOException {
- writer.deleteAll();
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- public long addDocument(IndexDocument d, Analyzer a) throws IOException {
- writer.addDocument(d, a);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- public long addDocuments(Iterable extends IndexDocument> docs, Analyzer a) throws IOException {
- writer.addDocuments(docs, a);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- public long addDocument(IndexDocument d) throws IOException {
- writer.addDocument(d);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- public long addDocuments(Iterable extends IndexDocument> docs) throws IOException {
- writer.addDocuments(docs);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- public long addIndexes(Directory... dirs) throws IOException {
- writer.addIndexes(dirs);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- public long addIndexes(IndexReader... readers) throws IOException {
- writer.addIndexes(readers);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- public long getGeneration() {
- return indexingGen.get();
- }
-
- public IndexWriter getIndexWriter() {
- return writer;
- }
-
- long getAndIncrementGeneration() {
- return indexingGen.getAndIncrement();
- }
-
- public long tryDeleteDocument(IndexReader reader, int docID) throws IOException {
- if (writer.tryDeleteDocument(reader, docID)) {
- return indexingGen.get();
- } else {
- return -1;
- }
- }
- }
-
- /**
- * Waits for the target generation to become visible in
- * the searcher.
- * If the current searcher is older than the
- * target generation, this method will block
- * until the searcher is reopened, by another via
- * {@link #maybeRefresh} or until the {@link NRTManager} is closed.
- *
- * @param targetGen the generation to wait for
- */
- public void waitForGeneration(long targetGen) {
- waitForGeneration(targetGen, -1, TimeUnit.NANOSECONDS);
- }
-
- /**
- * Waits for the target generation to become visible in
- * the searcher. If the current searcher is older than
- * the target generation, this method will block until the
- * searcher has been reopened by another thread via
- * {@link #maybeRefresh}, the given waiting time has elapsed, or until
- * the NRTManager is closed.
- *
- * NOTE: if the waiting time elapses before the requested target generation is
- * available the current {@link SearcherManager} is returned instead.
- *
- * @param targetGen
- * the generation to wait for
- * @param time
- * the time to wait for the target generation
- * @param unit
- * the waiting time's time unit
- */
- public void waitForGeneration(long targetGen, long time, TimeUnit unit) {
- try {
- final long curGen = writer.getGeneration();
- if (targetGen > curGen) {
- throw new IllegalArgumentException("targetGen=" + targetGen + " was never returned by this NRTManager instance (current gen=" + curGen + ")");
- }
- genLock.lockInterruptibly();
- try {
- if (targetGen > searchingGen) {
- for (WaitingListener listener : waitingListeners) {
- listener.waiting(targetGen);
- }
- while (targetGen > searchingGen) {
- if (!waitOnGenCondition(time, unit)) {
- return;
- }
- }
- }
- } finally {
- genLock.unlock();
- }
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
- }
- }
-
- private boolean waitOnGenCondition(long time, TimeUnit unit)
- throws InterruptedException {
- assert genLock.isHeldByCurrentThread();
- if (time < 0) {
- newGeneration.await();
- return true;
- } else {
- return newGeneration.await(time, unit);
- }
- }
-
- /** Returns generation of current searcher. */
- public long getCurrentSearchingGen() {
- return searchingGen;
- }
-
- private long lastRefreshGen;
-
- @Override
- protected IndexSearcher refreshIfNeeded(IndexSearcher referenceToRefresh) throws IOException {
- // Record gen as of when reopen started:
- lastRefreshGen = writer.getAndIncrementGeneration();
- final IndexReader r = referenceToRefresh.getIndexReader();
- assert r instanceof DirectoryReader: "searcher's IndexReader should be a DirectoryReader, but got " + r;
- final DirectoryReader dirReader = (DirectoryReader) r;
- IndexSearcher newSearcher = null;
- if (!dirReader.isCurrent()) {
- final IndexReader newReader = DirectoryReader.openIfChanged(dirReader);
- if (newReader != null) {
- newSearcher = SearcherManager.getSearcher(searcherFactory, newReader);
- }
- }
-
- return newSearcher;
- }
-
- @Override
- protected void afterMaybeRefresh() {
- genLock.lock();
- try {
- if (searchingGen != MAX_SEARCHER_GEN) {
- // update searchingGen:
- assert lastRefreshGen >= searchingGen;
- searchingGen = lastRefreshGen;
- }
- // wake up threads if we have a new generation:
- newGeneration.signalAll();
- } finally {
- genLock.unlock();
- }
- }
-
- @Override
- protected synchronized void afterClose() throws IOException {
- genLock.lock();
- try {
- // max it out to make sure nobody can wait on another gen
- searchingGen = MAX_SEARCHER_GEN;
- newGeneration.signalAll();
- } finally {
- genLock.unlock();
- }
- }
-
- /**
- * Returns true if no changes have occured since this searcher
- * ie. reader was opened, otherwise false.
- * @see DirectoryReader#isCurrent()
- */
- public boolean isSearcherCurrent() throws IOException {
- final IndexSearcher searcher = acquire();
- try {
- final IndexReader r = searcher.getIndexReader();
- assert r instanceof DirectoryReader: "searcher's IndexReader should be a DirectoryReader, but got " + r;
- return ((DirectoryReader) r).isCurrent();
- } finally {
- release(searcher);
- }
- }
-}
Index: lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java
===================================================================
--- lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java (revision 0)
+++ lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java (working copy)
@@ -0,0 +1,235 @@
+package org.apache.lucene.search;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.index.TrackingIndexWriter;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/** Utility class to manage reopens of a {@link
+ * ReferenceManager} with methods to wait for a specific
+ * generation to become visible. To use this class you
+ * must first wrap your {@link IndexWriter} with a {@link
+ * TrackingIndexWriter}. Always use the
+ * TrackingIndexWriter to make changes to the index, and
+ * save the returned generation. Then, when a given search
+ * request needs to see a specific index change, call the
+ * {#waitForGeneration} to wait for that change to be
+ * visible.
+ *
+ * @lucene.experimental */
+
+public class ControlledRealTimeReopenThread extends Thread implements Closeable {
+
+ private final ReferenceManager manager;
+ private final long targetMaxStaleNS;
+ private final long targetMinStaleNS;
+ private final TrackingIndexWriter writer;
+ private volatile boolean finish;
+ private volatile long waitingGen;
+ private volatile long searchingGen;
+ private long refreshStartGen;
+
+ private final ReentrantLock reopenLock = new ReentrantLock();
+ private final Condition reopenCond = reopenLock.newCondition();
+
+ /**
+ * Create ControlledRealTimeReopenThread, to periodically
+ * reopen the NRT searcher.
+ *
+ * @param targetMaxStaleSec Maximum time until a new
+ * reader must be opened; this sets the upper bound
+ * on how slowly reopens may occur, when no
+ * caller is waiting for a specific generation to
+ * become visible.
+ *
+ * @param targetMinStaleSec Mininum time until a new
+ * reader can be opened; this sets the lower bound
+ * on how quickly reopens may occur, when a caller
+ * is waiting for a specific generation to
+ * become visible.
+ */
+ public ControlledRealTimeReopenThread(TrackingIndexWriter writer, ReferenceManager manager, double targetMaxStaleSec, double targetMinStaleSec) {
+ if (targetMaxStaleSec < targetMinStaleSec) {
+ throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")");
+ }
+ this.writer = writer;
+ this.manager = manager;
+ this.targetMaxStaleNS = (long) (1000000000*targetMaxStaleSec);
+ this.targetMinStaleNS = (long) (1000000000*targetMinStaleSec);
+ manager.addListener(new HandleRefresh());
+ }
+
+ private class HandleRefresh implements ReferenceManager.RefreshListener {
+ @Override
+ public void beforeRefresh() {
+ }
+
+ @Override
+ public void afterRefresh(boolean didRefresh) {
+ refreshDone(didRefresh);
+ }
+ }
+
+ private synchronized void refreshDone(boolean didRefresh) {
+ searchingGen = refreshStartGen;
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void close() {
+ //System.out.println("NRT: set finish");
+
+ finish = true;
+
+ // So thread wakes up and notices it should finish:
+ reopenLock.lock();
+ try {
+ reopenCond.signal();
+ } finally {
+ reopenLock.unlock();
+ }
+
+ try {
+ join();
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ }
+
+ // Max it out so any waiting search threads will return:
+ searchingGen = Long.MAX_VALUE;
+ notifyAll();
+ }
+
+ /**
+ * Waits for the target generation to become visible in
+ * the searcher.
+ * If the current searcher is older than the
+ * target generation, this method will block
+ * until the searcher is reopened, by another via
+ * {@link #maybeRefresh} or until the {@link NRTManager} is closed.
+ *
+ * @param targetGen the generation to wait for
+ */
+ public void waitForGeneration(long targetGen) throws InterruptedException {
+ waitForGeneration(targetGen, -1);
+ }
+
+ /**
+ * Waits for the target generation to become visible in
+ * the searcher. If the current searcher is older than
+ * the target generation, this method will block until the
+ * searcher has been reopened by another thread via
+ * {@link #maybeRefresh}, the given waiting time has elapsed, or until
+ * the NRTManager is closed.
+ *
+ * NOTE: if the waiting time elapses before the requested target generation is
+ * available the current {@link SearcherManager} is returned instead.
+ *
+ * @param targetGen
+ * the generation to wait for
+ * @param maxMS
+ * maximum milliseconds to wait, or -1 to wait indefinitely
+ * @return true if the targetGeneration is now available,
+ * or false if maxMS wait time was exceeded
+ */
+ public synchronized boolean waitForGeneration(long targetGen, int maxMS) throws InterruptedException {
+ final long curGen = writer.getGeneration();
+ if (targetGen > curGen) {
+ throw new IllegalArgumentException("targetGen=" + targetGen + " was never returned by this NRTManager instance (current gen=" + curGen + ")");
+ }
+ if (targetGen > searchingGen) {
+ waitingGen = Math.max(waitingGen, targetGen);
+
+ // Notify the reopen thread that the waitingGen has
+ // changed, so it may wake up and realize it should
+ // not sleep for much or any longer before reopening:
+ reopenLock.lock();
+ try {
+ reopenCond.signal();
+ } finally {
+ reopenLock.unlock();
+ }
+
+ long startMS = System.nanoTime()/1000000;
+
+ while (targetGen > searchingGen) {
+ if (maxMS < 0) {
+ wait();
+ } else {
+ long msLeft = ((startMS + maxMS) - (System.nanoTime())/1000000);
+ if (msLeft <= 0) {
+ return false;
+ } else {
+ wait(msLeft);
+ }
+ }
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public void run() {
+ // TODO: maybe use private thread ticktock timer, in
+ // case clock shift messes up nanoTime?
+ long lastReopenStartNS = System.nanoTime();
+
+ //System.out.println("reopen: start");
+ while (!finish) {
+
+ // TODO: try to guestimate how long reopen might
+ // take based on past data?
+
+ // True if we have someone waiting for reopenedd searcher:
+ boolean hasWaiting = waitingGen > searchingGen;
+ final long nextReopenStartNS = lastReopenStartNS + (hasWaiting ? targetMinStaleNS : targetMaxStaleNS);
+
+ final long sleepNS = nextReopenStartNS - System.nanoTime();
+
+ if (sleepNS > 0) {
+ reopenLock.lock();
+ try {
+ reopenCond.awaitNanos(sleepNS);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ return;
+ } finally {
+ reopenLock.unlock();
+ }
+ }
+
+ if (finish) {
+ break;
+ }
+
+ lastReopenStartNS = System.nanoTime();
+ refreshStartGen = writer.getAndIncrementGeneration();
+ try {
+ manager.maybeRefreshBlocking();
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+ }
+}
Property changes on: lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java
===================================================================
--- lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java (revision 0)
+++ lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java (working copy)
@@ -0,0 +1,157 @@
+package org.apache.lucene.index;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.store.Directory;
+
+/** Class that tracks changes to a delegated
+ * IndexWriter, used by {@link
+ * ControlledRealTimeReopenThread} to ensure specific
+ * changes are visible. Create this class (passing your
+ * IndexWriter), and then pass this class to {@link
+ * ControlledRealTimeReopenThread}.
+ * Be sure to make all changes via the
+ * TrackingIndexWriter, otherwise {@link
+ * ControlledRealTimeReopenThread} won't know about the changes.
+ *
+ * @lucene.experimental */
+
+public class TrackingIndexWriter {
+ private final IndexWriter writer;
+ private final AtomicLong indexingGen = new AtomicLong(1);
+
+ public TrackingIndexWriter(IndexWriter writer) {
+ this.writer = writer;
+ }
+
+ public long updateDocument(Term t, IndexDocument d, Analyzer a) throws IOException {
+ writer.updateDocument(t, d, a);
+ // Return gen as of when indexing finished:
+ return indexingGen.get();
+ }
+
+ public long updateDocument(Term t, IndexDocument d) throws IOException {
+ writer.updateDocument(t, d);
+ // Return gen as of when indexing finished:
+ return indexingGen.get();
+ }
+
+ public long updateDocuments(Term t, Iterable extends IndexDocument> docs, Analyzer a) throws IOException {
+ writer.updateDocuments(t, docs, a);
+ // Return gen as of when indexing finished:
+ return indexingGen.get();
+ }
+
+ public long updateDocuments(Term t, Iterable extends IndexDocument> docs) throws IOException {
+ writer.updateDocuments(t, docs);
+ // Return gen as of when indexing finished:
+ return indexingGen.get();
+ }
+
+ public long deleteDocuments(Term t) throws IOException {
+ writer.deleteDocuments(t);
+ // Return gen as of when indexing finished:
+ return indexingGen.get();
+ }
+
+ public long deleteDocuments(Term... terms) throws IOException {
+ writer.deleteDocuments(terms);
+ // Return gen as of when indexing finished:
+ return indexingGen.get();
+ }
+
+ public long deleteDocuments(Query q) throws IOException {
+ writer.deleteDocuments(q);
+ // Return gen as of when indexing finished:
+ return indexingGen.get();
+ }
+
+ public long deleteDocuments(Query... queries) throws IOException {
+ writer.deleteDocuments(queries);
+ // Return gen as of when indexing finished:
+ return indexingGen.get();
+ }
+
+ public long deleteAll() throws IOException {
+ writer.deleteAll();
+ // Return gen as of when indexing finished:
+ return indexingGen.get();
+ }
+
+ public long addDocument(IndexDocument d, Analyzer a) throws IOException {
+ writer.addDocument(d, a);
+ // Return gen as of when indexing finished:
+ return indexingGen.get();
+ }
+
+ public long addDocuments(Iterable extends IndexDocument> docs, Analyzer a) throws IOException {
+ writer.addDocuments(docs, a);
+ // Return gen as of when indexing finished:
+ return indexingGen.get();
+ }
+
+ public long addDocument(IndexDocument d) throws IOException {
+ writer.addDocument(d);
+ // Return gen as of when indexing finished:
+ return indexingGen.get();
+ }
+
+ public long addDocuments(Iterable extends IndexDocument> docs) throws IOException {
+ writer.addDocuments(docs);
+ // Return gen as of when indexing finished:
+ return indexingGen.get();
+ }
+
+ public long addIndexes(Directory... dirs) throws IOException {
+ writer.addIndexes(dirs);
+ // Return gen as of when indexing finished:
+ return indexingGen.get();
+ }
+
+ public long addIndexes(IndexReader... readers) throws IOException {
+ writer.addIndexes(readers);
+ // Return gen as of when indexing finished:
+ return indexingGen.get();
+ }
+
+ public long getGeneration() {
+ return indexingGen.get();
+ }
+
+ public IndexWriter getIndexWriter() {
+ return writer;
+ }
+
+ public long getAndIncrementGeneration() {
+ return indexingGen.getAndIncrement();
+ }
+
+ public long tryDeleteDocument(IndexReader reader, int docID) throws IOException {
+ if (writer.tryDeleteDocument(reader, docID)) {
+ return indexingGen.get();
+ } else {
+ return -1;
+ }
+ }
+}
+
Property changes on: lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: lucene/core/src/test/org/apache/lucene/search/TestNRTManager.java
===================================================================
--- lucene/core/src/test/org/apache/lucene/search/TestNRTManager.java (revision 1476852)
+++ lucene/core/src/test/org/apache/lucene/search/TestNRTManager.java (working copy)
@@ -1,445 +0,0 @@
-package org.apache.lucene.search;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.MockAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexDocument;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.NoMergePolicy;
-import org.apache.lucene.index.RandomIndexWriter;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.NRTCachingDirectory;
-import org.apache.lucene.util.IOUtils;
-import org.apache.lucene.util.LuceneTestCase;
-import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
-import org.apache.lucene.util.ThreadInterruptedException;
-
-@SuppressCodecs({ "SimpleText", "Memory", "Direct" })
-public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
-
- private final ThreadLocal lastGens = new ThreadLocal();
- private boolean warmCalled;
-
- public void testNRTManager() throws Exception {
- runTest("TestNRTManager");
- }
-
- @Override
- protected IndexSearcher getFinalSearcher() throws Exception {
- if (VERBOSE) {
- System.out.println("TEST: finalSearcher maxGen=" + maxGen);
- }
- nrtDeletes.waitForGeneration(maxGen);
- return nrtDeletes.acquire();
- }
-
- @Override
- protected Directory getDirectory(Directory in) {
- // Randomly swap in NRTCachingDir
- if (random().nextBoolean()) {
- if (VERBOSE) {
- System.out.println("TEST: wrap NRTCachingDir");
- }
-
- return new NRTCachingDirectory(in, 5.0, 60.0);
- } else {
- return in;
- }
- }
-
- @Override
- protected void updateDocuments(Term id, List extends IndexDocument> docs) throws Exception {
- final long gen = genWriter.updateDocuments(id, docs);
-
- // Randomly verify the update "took":
- if (random().nextInt(20) == 2) {
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
- }
- nrtDeletes.waitForGeneration(gen);
- final IndexSearcher s = nrtDeletes.acquire();
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
- }
- try {
- assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits);
- } finally {
- nrtDeletes.release(s);
- }
- }
-
- lastGens.set(gen);
- }
-
- @Override
- protected void addDocuments(Term id, List extends IndexDocument> docs) throws Exception {
- final long gen = genWriter.addDocuments(docs);
- // Randomly verify the add "took":
- if (random().nextInt(20) == 2) {
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
- }
- nrtNoDeletes.waitForGeneration(gen);
- final IndexSearcher s = nrtNoDeletes.acquire();
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
- }
- try {
- assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits);
- } finally {
- nrtNoDeletes.release(s);
- }
- }
- lastGens.set(gen);
- }
-
- @Override
- protected void addDocument(Term id, IndexDocument doc) throws Exception {
- final long gen = genWriter.addDocument(doc);
-
- // Randomly verify the add "took":
- if (random().nextInt(20) == 2) {
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
- }
- nrtNoDeletes.waitForGeneration(gen);
- final IndexSearcher s = nrtNoDeletes.acquire();
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
- }
- try {
- assertEquals(1, s.search(new TermQuery(id), 10).totalHits);
- } finally {
- nrtNoDeletes.release(s);
- }
- }
- lastGens.set(gen);
- }
-
- @Override
- protected void updateDocument(Term id, IndexDocument doc) throws Exception {
- final long gen = genWriter.updateDocument(id, doc);
- // Randomly verify the udpate "took":
- if (random().nextInt(20) == 2) {
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
- }
- nrtDeletes.waitForGeneration(gen);
- final IndexSearcher s = nrtDeletes.acquire();
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
- }
- try {
- assertEquals(1, s.search(new TermQuery(id), 10).totalHits);
- } finally {
- nrtDeletes.release(s);
- }
- }
- lastGens.set(gen);
- }
-
- @Override
- protected void deleteDocuments(Term id) throws Exception {
- final long gen = genWriter.deleteDocuments(id);
- // randomly verify the delete "took":
- if (random().nextInt(20) == 7) {
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": nrt: verify del " + id);
- }
- nrtDeletes.waitForGeneration(gen);
- final IndexSearcher s = nrtDeletes.acquire();
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
- }
- try {
- assertEquals(0, s.search(new TermQuery(id), 10).totalHits);
- } finally {
- nrtDeletes.release(s);
- }
- }
- lastGens.set(gen);
- }
-
- // Not guaranteed to reflect deletes:
- private NRTManager nrtNoDeletes;
-
- // Is guaranteed to reflect deletes:
- private NRTManager nrtDeletes;
-
- private NRTManager.TrackingIndexWriter genWriter;
-
- private NRTManagerReopenThread nrtDeletesThread;
- private NRTManagerReopenThread nrtNoDeletesThread;
-
- @Override
- protected void doAfterWriter(final ExecutorService es) throws Exception {
- final double minReopenSec = 0.01 + 0.05 * random().nextDouble();
- final double maxReopenSec = minReopenSec * (1.0 + 10 * random().nextDouble());
-
- if (VERBOSE) {
- System.out.println("TEST: make NRTManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec);
- }
-
- genWriter = new NRTManager.TrackingIndexWriter(writer);
-
- final SearcherFactory sf = new SearcherFactory() {
- @Override
- public IndexSearcher newSearcher(IndexReader r) throws IOException {
- TestNRTManager.this.warmCalled = true;
- IndexSearcher s = new IndexSearcher(r, es);
- s.search(new TermQuery(new Term("body", "united")), 10);
- return s;
- }
- };
-
- nrtNoDeletes = new NRTManager(genWriter, sf, false);
- nrtDeletes = new NRTManager(genWriter, sf, true);
-
- nrtDeletesThread = new NRTManagerReopenThread(nrtDeletes, maxReopenSec, minReopenSec);
- nrtDeletesThread.setName("NRTDeletes Reopen Thread");
- nrtDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
- nrtDeletesThread.setDaemon(true);
- nrtDeletesThread.start();
-
- nrtNoDeletesThread = new NRTManagerReopenThread(nrtNoDeletes, maxReopenSec, minReopenSec);
- nrtNoDeletesThread.setName("NRTNoDeletes Reopen Thread");
- nrtNoDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
- nrtNoDeletesThread.setDaemon(true);
- nrtNoDeletesThread.start();
- }
-
- @Override
- protected void doAfterIndexingThreadDone() {
- Long gen = lastGens.get();
- if (gen != null) {
- addMaxGen(gen);
- }
- }
-
- private long maxGen = -1;
-
- private synchronized void addMaxGen(long gen) {
- maxGen = Math.max(gen, maxGen);
- }
-
- @Override
- protected void doSearching(ExecutorService es, long stopTime) throws Exception {
- runSearchThreads(stopTime);
- }
-
- @Override
- protected IndexSearcher getCurrentSearcher() throws Exception {
- // Test doesn't assert deletions until the end, so we
- // can randomize whether dels must be applied
- final NRTManager nrt;
- if (random().nextBoolean()) {
- nrt = nrtDeletes;
- } else {
- nrt = nrtNoDeletes;
- }
-
- return nrt.acquire();
- }
-
- @Override
- protected void releaseSearcher(IndexSearcher s) throws Exception {
- // NOTE: a bit iffy... technically you should release
- // against the same NRT mgr you acquired from... but
- // both impls just decRef the underlying reader so we
- // can get away w/ cheating:
- nrtNoDeletes.release(s);
- }
-
- @Override
- protected void doClose() throws Exception {
- assertTrue(warmCalled);
- if (VERBOSE) {
- System.out.println("TEST: now close NRTManager");
- }
- nrtDeletesThread.close();
- nrtDeletes.close();
- nrtNoDeletesThread.close();
- nrtNoDeletes.close();
- }
-
- /*
- * LUCENE-3528 - NRTManager hangs in certain situations
- */
- public void testThreadStarvationNoDeleteNRTReader() throws IOException, InterruptedException {
- IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
- conf.setMergePolicy(random().nextBoolean() ? NoMergePolicy.COMPOUND_FILES : NoMergePolicy.NO_COMPOUND_FILES);
- Directory d = newDirectory();
- final CountDownLatch latch = new CountDownLatch(1);
- final CountDownLatch signal = new CountDownLatch(1);
-
- LatchedIndexWriter _writer = new LatchedIndexWriter(d, conf, latch, signal);
- final NRTManager.TrackingIndexWriter writer = new NRTManager.TrackingIndexWriter(_writer);
- final NRTManager manager = new NRTManager(writer, null, false);
- Document doc = new Document();
- doc.add(newTextField("test", "test", Field.Store.YES));
- long gen = writer.addDocument(doc);
- manager.maybeRefresh();
- assertFalse(gen < manager.getCurrentSearchingGen());
- Thread t = new Thread() {
- @Override
- public void run() {
- try {
- signal.await();
- manager.maybeRefresh();
- writer.deleteDocuments(new TermQuery(new Term("foo", "barista")));
- manager.maybeRefresh(); // kick off another reopen so we inc. the internal gen
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- latch.countDown(); // let the add below finish
- }
- }
- };
- t.start();
- _writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through
- final long lastGen = writer.updateDocument(new Term("foo", "bar"), doc); // once this returns the doc is already reflected in the last reopen
-
- assertFalse(manager.isSearcherCurrent()); // false since there is a delete in the queue
-
- IndexSearcher searcher = manager.acquire();
- try {
- assertEquals(2, searcher.getIndexReader().numDocs());
- } finally {
- manager.release(searcher);
- }
- NRTManagerReopenThread thread = new NRTManagerReopenThread(manager, 0.01, 0.01);
- thread.start(); // start reopening
- if (VERBOSE) {
- System.out.println("waiting now for generation " + lastGen);
- }
-
- final AtomicBoolean finished = new AtomicBoolean(false);
- Thread waiter = new Thread() {
- @Override
- public void run() {
- manager.waitForGeneration(lastGen);
- finished.set(true);
- }
- };
- waiter.start();
- manager.maybeRefresh();
- waiter.join(1000);
- if (!finished.get()) {
- waiter.interrupt();
- fail("thread deadlocked on waitForGeneration");
- }
- thread.close();
- thread.join();
- IOUtils.close(manager, _writer, d);
- }
-
- public static class LatchedIndexWriter extends IndexWriter {
-
- private CountDownLatch latch;
- boolean waitAfterUpdate = false;
- private CountDownLatch signal;
-
- public LatchedIndexWriter(Directory d, IndexWriterConfig conf,
- CountDownLatch latch, CountDownLatch signal)
- throws IOException {
- super(d, conf);
- this.latch = latch;
- this.signal = signal;
-
- }
-
- @Override
- public void updateDocument(Term term,
- IndexDocument doc, Analyzer analyzer)
- throws IOException {
- super.updateDocument(term, doc, analyzer);
- try {
- if (waitAfterUpdate) {
- signal.countDown();
- latch.await();
- }
- } catch (InterruptedException e) {
- throw new ThreadInterruptedException(e);
- }
- }
- }
-
- public void testEvilSearcherFactory() throws Exception {
- final Directory dir = newDirectory();
- final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
- w.commit();
-
- final IndexReader other = DirectoryReader.open(dir);
-
- final SearcherFactory theEvilOne = new SearcherFactory() {
- @Override
- public IndexSearcher newSearcher(IndexReader ignored) {
- return LuceneTestCase.newSearcher(other);
- }
- };
-
- try {
- new NRTManager(new NRTManager.TrackingIndexWriter(w.w), theEvilOne);
- } catch (IllegalStateException ise) {
- // expected
- }
- w.close();
- other.close();
- dir.close();
- }
-
- public void testListenerCalled() throws Exception {
- Directory dir = newDirectory();
- IndexWriter iw = new IndexWriter(dir, new IndexWriterConfig(TEST_VERSION_CURRENT, null));
- final AtomicBoolean afterRefreshCalled = new AtomicBoolean(false);
- NRTManager sm = new NRTManager(new NRTManager.TrackingIndexWriter(iw),new SearcherFactory());
- sm.addListener(new ReferenceManager.RefreshListener() {
- @Override
- public void beforeRefresh() {
- }
- @Override
- public void afterRefresh(boolean didRefresh) {
- if (didRefresh) {
- afterRefreshCalled.set(true);
- }
- }
- });
- iw.addDocument(new Document());
- iw.commit();
- assertFalse(afterRefreshCalled.get());
- sm.maybeRefreshBlocking();
- assertTrue(afterRefreshCalled.get());
- sm.close();
- iw.close();
- dir.close();
- }
-}
Index: lucene/core/src/test/org/apache/lucene/search/TestLiveFieldValues.java
===================================================================
--- lucene/core/src/test/org/apache/lucene/search/TestLiveFieldValues.java (revision 1476852)
+++ lucene/core/src/test/org/apache/lucene/search/TestLiveFieldValues.java (working copy)
@@ -25,7 +25,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.apache.lucene.analysis.MockAnalyzer;
@@ -36,10 +35,8 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.StoredDocument;
import org.apache.lucene.index.Term;
-import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util._TestUtil;
@@ -50,10 +47,9 @@
Directory dir = newFSDirectory(_TestUtil.getTempDir("livefieldupdates"));
IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
- final IndexWriter _w = new IndexWriter(dir, iwc);
- final TrackingIndexWriter w = new TrackingIndexWriter(_w);
+ final IndexWriter w = new IndexWriter(dir, iwc);
- final NRTManager mgr = new NRTManager(w, new SearcherFactory() {
+ final SearcherManager mgr = new SearcherManager(w, true, new SearcherFactory() {
@Override
public IndexSearcher newSearcher(IndexReader r) {
return new IndexSearcher(r);
@@ -174,7 +170,7 @@
rt.close();
mgr.close();
- _w.close();
+ w.close();
dir.close();
}
}
Index: lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
===================================================================
--- lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java (working copy)
+++ lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java (working copy)
@@ -36,21 +36,33 @@
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase;
+import org.apache.lucene.index.TrackingIndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.lucene.util.LuceneTestCase;
-import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.lucene.util.ThreadInterruptedException;
@SuppressCodecs({ "SimpleText", "Memory", "Direct" })
-public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
+public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearchingTestCase {
+ // Not guaranteed to reflect deletes:
+ private SearcherManager nrtNoDeletes;
+
+ // Is guaranteed to reflect deletes:
+ private SearcherManager nrtDeletes;
+
+ private TrackingIndexWriter genWriter;
+
+ private ControlledRealTimeReopenThread nrtDeletesThread;
+ private ControlledRealTimeReopenThread nrtNoDeletesThread;
+
private final ThreadLocal lastGens = new ThreadLocal();
private boolean warmCalled;
- public void testNRTManager() throws Exception {
- runTest("TestNRTManager");
+ public void testControlledRealTimeReopenThread() throws Exception {
+ runTest("TestControlledRealTimeReopenThread");
}
@Override
@@ -58,7 +70,7 @@
if (VERBOSE) {
System.out.println("TEST: finalSearcher maxGen=" + maxGen);
}
- nrtDeletes.waitForGeneration(maxGen);
+ nrtDeletesThread.waitForGeneration(maxGen);
return nrtDeletes.acquire();
}
@@ -85,7 +97,7 @@
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
}
- nrtDeletes.waitForGeneration(gen);
+ nrtDeletesThread.waitForGeneration(gen);
final IndexSearcher s = nrtDeletes.acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
@@ -108,7 +120,7 @@
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
}
- nrtNoDeletes.waitForGeneration(gen);
+ nrtNoDeletesThread.waitForGeneration(gen);
final IndexSearcher s = nrtNoDeletes.acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
@@ -131,7 +143,7 @@
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
}
- nrtNoDeletes.waitForGeneration(gen);
+ nrtNoDeletesThread.waitForGeneration(gen);
final IndexSearcher s = nrtNoDeletes.acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
@@ -153,7 +165,7 @@
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
}
- nrtDeletes.waitForGeneration(gen);
+ nrtDeletesThread.waitForGeneration(gen);
final IndexSearcher s = nrtDeletes.acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
@@ -175,7 +187,7 @@
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify del " + id);
}
- nrtDeletes.waitForGeneration(gen);
+ nrtDeletesThread.waitForGeneration(gen);
final IndexSearcher s = nrtDeletes.acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
@@ -189,48 +201,37 @@
lastGens.set(gen);
}
- // Not guaranteed to reflect deletes:
- private NRTManager nrtNoDeletes;
-
- // Is guaranteed to reflect deletes:
- private NRTManager nrtDeletes;
-
- private NRTManager.TrackingIndexWriter genWriter;
-
- private NRTManagerReopenThread nrtDeletesThread;
- private NRTManagerReopenThread nrtNoDeletesThread;
-
@Override
protected void doAfterWriter(final ExecutorService es) throws Exception {
final double minReopenSec = 0.01 + 0.05 * random().nextDouble();
final double maxReopenSec = minReopenSec * (1.0 + 10 * random().nextDouble());
if (VERBOSE) {
- System.out.println("TEST: make NRTManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec);
+ System.out.println("TEST: make SearcherManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec);
}
- genWriter = new NRTManager.TrackingIndexWriter(writer);
+ genWriter = new TrackingIndexWriter(writer);
final SearcherFactory sf = new SearcherFactory() {
@Override
public IndexSearcher newSearcher(IndexReader r) throws IOException {
- TestNRTManager.this.warmCalled = true;
+ TestControlledRealTimeReopenThread.this.warmCalled = true;
IndexSearcher s = new IndexSearcher(r, es);
s.search(new TermQuery(new Term("body", "united")), 10);
return s;
}
};
- nrtNoDeletes = new NRTManager(genWriter, sf, false);
- nrtDeletes = new NRTManager(genWriter, sf, true);
+ nrtNoDeletes = new SearcherManager(writer, false, sf);
+ nrtDeletes = new SearcherManager(writer, true, sf);
- nrtDeletesThread = new NRTManagerReopenThread(nrtDeletes, maxReopenSec, minReopenSec);
+ nrtDeletesThread = new ControlledRealTimeReopenThread(genWriter, nrtDeletes, maxReopenSec, minReopenSec);
nrtDeletesThread.setName("NRTDeletes Reopen Thread");
nrtDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
nrtDeletesThread.setDaemon(true);
nrtDeletesThread.start();
- nrtNoDeletesThread = new NRTManagerReopenThread(nrtNoDeletes, maxReopenSec, minReopenSec);
+ nrtNoDeletesThread = new ControlledRealTimeReopenThread(genWriter, nrtNoDeletes, maxReopenSec, minReopenSec);
nrtNoDeletesThread.setName("NRTNoDeletes Reopen Thread");
nrtNoDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
nrtNoDeletesThread.setDaemon(true);
@@ -260,7 +261,7 @@
protected IndexSearcher getCurrentSearcher() throws Exception {
// Test doesn't assert deletions until the end, so we
// can randomize whether dels must be applied
- final NRTManager nrt;
+ final SearcherManager nrt;
if (random().nextBoolean()) {
nrt = nrtDeletes;
} else {
@@ -273,7 +274,7 @@
@Override
protected void releaseSearcher(IndexSearcher s) throws Exception {
// NOTE: a bit iffy... technically you should release
- // against the same NRT mgr you acquired from... but
+ // against the same SearcherManager you acquired from... but
// both impls just decRef the underlying reader so we
// can get away w/ cheating:
nrtNoDeletes.release(s);
@@ -283,7 +284,7 @@
protected void doClose() throws Exception {
assertTrue(warmCalled);
if (VERBOSE) {
- System.out.println("TEST: now close NRTManager");
+ System.out.println("TEST: now close SearcherManagers");
}
nrtDeletesThread.close();
nrtDeletes.close();
@@ -302,13 +303,12 @@
final CountDownLatch signal = new CountDownLatch(1);
LatchedIndexWriter _writer = new LatchedIndexWriter(d, conf, latch, signal);
- final NRTManager.TrackingIndexWriter writer = new NRTManager.TrackingIndexWriter(_writer);
- final NRTManager manager = new NRTManager(writer, null, false);
+ final TrackingIndexWriter writer = new TrackingIndexWriter(_writer);
+ final SearcherManager manager = new SearcherManager(_writer, false, null);
Document doc = new Document();
doc.add(newTextField("test", "test", Field.Store.YES));
long gen = writer.addDocument(doc);
manager.maybeRefresh();
- assertFalse(gen < manager.getCurrentSearchingGen());
Thread t = new Thread() {
@Override
public void run() {
@@ -336,7 +336,7 @@
} finally {
manager.release(searcher);
}
- NRTManagerReopenThread thread = new NRTManagerReopenThread(manager, 0.01, 0.01);
+ final ControlledRealTimeReopenThread thread = new ControlledRealTimeReopenThread(writer, manager, 0.01, 0.01);
thread.start(); // start reopening
if (VERBOSE) {
System.out.println("waiting now for generation " + lastGen);
@@ -346,7 +346,12 @@
Thread waiter = new Thread() {
@Override
public void run() {
- manager.waitForGeneration(lastGen);
+ try {
+ thread.waitForGeneration(lastGen);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(ie);
+ }
finished.set(true);
}
};
@@ -408,7 +413,7 @@
};
try {
- new NRTManager(new NRTManager.TrackingIndexWriter(w.w), theEvilOne);
+ new SearcherManager(w.w, false, theEvilOne);
} catch (IllegalStateException ise) {
// expected
}
@@ -421,7 +426,7 @@
Directory dir = newDirectory();
IndexWriter iw = new IndexWriter(dir, new IndexWriterConfig(TEST_VERSION_CURRENT, null));
final AtomicBoolean afterRefreshCalled = new AtomicBoolean(false);
- NRTManager sm = new NRTManager(new NRTManager.TrackingIndexWriter(iw),new SearcherFactory());
+ SearcherManager sm = new SearcherManager(iw, true, new SearcherFactory());
sm.addListener(new ReferenceManager.RefreshListener() {
@Override
public void beforeRefresh() {