Index: src/java/org/apache/lucene/index/TimeLimitingIndexReader.java
===================================================================
--- src/java/org/apache/lucene/index/TimeLimitingIndexReader.java (revision 0)
+++ src/java/org/apache/lucene/index/TimeLimitingIndexReader.java (revision 0)
@@ -0,0 +1,239 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.FieldSelector;
+import org.apache.lucene.util.ActivityTimeMonitor;
+import org.apache.lucene.util.ActivityTimedOutException;
+
+/**
+ * IndexReader that checks if client threads using this class are not
+ * over-running a maximum allotted time. Clients must use the
+ * {@link ActivityTimeMonitor} class's start and stop methods to define scope
+ * and this class wraps all calls to the underlying index with calls to
+ * ActivityTimeMonitor's checkForTimeout method which will throw a runtime
+ * exeption {@link ActivityTimedOutException} in the event of an activity
+ * over-run.
+ */
+public class TimeLimitingIndexReader extends FilterIndexReader {
+
+ // Term Docs with timedout safety checks
+ private static class TimeLimitedTermDocs implements TermDocs {
+
+ private TermDocs termDocs;
+
+ public TimeLimitedTermDocs(TermDocs termDocs) {
+ this.termDocs = termDocs;
+ }
+
+ public void close() throws IOException {
+ termDocs.close();
+ }
+
+ public int doc() {
+ ActivityTimeMonitor.checkForTimeout();
+ return termDocs.doc();
+ }
+
+ public int freq() {
+ ActivityTimeMonitor.checkForTimeout();
+ return termDocs.freq();
+ }
+
+ public boolean next() throws IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ return termDocs.next();
+ }
+
+ public int read(int[] docs, int[] freqs) throws IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ return termDocs.read(docs, freqs);
+ }
+
+ public void seek(Term term) throws IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ termDocs.seek(term);
+ }
+
+ public void seek(TermEnum termEnum) throws IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ termDocs.seek(termEnum);
+ }
+
+ public boolean skipTo(int target) throws IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ return termDocs.skipTo(target);
+ }
+ }
+
+ private static class TimeLimitedTermEnum extends TermEnum {
+
+ private TermEnum terms;
+
+ public TimeLimitedTermEnum(TermEnum terms) {
+ this.terms = terms;
+ }
+
+ @Override
+ public void close() throws IOException {
+ terms.close();
+ }
+
+ @Override
+ public int docFreq() {
+ ActivityTimeMonitor.checkForTimeout();
+ return terms.docFreq();
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ return terms.next();
+ }
+
+ @Override
+ public Term term() {
+ ActivityTimeMonitor.checkForTimeout();
+ return terms.term();
+ }
+
+ }
+
+ private static class TimeLimitedTermPositions extends TimeLimitedTermDocs
+ implements TermPositions {
+
+ private TermPositions tp;
+
+ public TimeLimitedTermPositions(TermPositions termPositions) {
+ super(termPositions);
+ this.tp = termPositions;
+ }
+
+ public byte[] getPayload(byte[] data, int offset) throws IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ return tp.getPayload(data, offset);
+ }
+
+ public int getPayloadLength() {
+ ActivityTimeMonitor.checkForTimeout();
+ return tp.getPayloadLength();
+ }
+
+ public boolean isPayloadAvailable() {
+ ActivityTimeMonitor.checkForTimeout();
+ return tp.isPayloadAvailable();
+ }
+
+ public int nextPosition() throws IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ return tp.nextPosition();
+ }
+ }
+
+ private boolean isSubReader;
+
+ public TimeLimitingIndexReader(IndexReader in) {
+ super(in);
+ }
+
+ public TimeLimitingIndexReader(IndexReader indexReader, boolean isSubReader) {
+ super(indexReader);
+ this.isSubReader = isSubReader;
+ }
+
+ @Override
+ public synchronized IndexReader reopen() throws CorruptIndexException, IOException {
+ IndexReader result=in.reopen();
+ if(in!=result) //really did reopen
+ {
+ result=new TimeLimitingIndexReader(result);
+ }
+ return result;
+ }
+ @Override
+ public IndexCommit getIndexCommit() throws IOException {
+ return in.getIndexCommit();
+ }
+
+
+ @Override
+ public int docFreq(Term t) throws IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ return super.docFreq(t);
+ }
+
+ @Override
+ public Document document(int n) throws CorruptIndexException, IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ return super.document(n);
+ }
+
+ @Override
+ public Document document(int n, FieldSelector fieldSelector)
+ throws CorruptIndexException, IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ return super.document(n, fieldSelector);
+ }
+
+ @Override
+ public IndexReader[] getSequentialSubReaders() {
+ if (isSubReader)
+ return null;
+ IndexReader[] results = super.getSequentialSubReaders();
+ IndexReader[] tlResults = new IndexReader[results.length];
+ for (int i = 0; i < results.length; i++) {
+ tlResults[i] = new TimeLimitingIndexReader(results[i], true);
+ }
+ return tlResults;
+ }
+
+ @Override
+ public TermDocs termDocs() throws IOException {
+ return new TimeLimitedTermDocs(super.termDocs());
+ }
+
+ @Override
+ public TermDocs termDocs(Term term) throws IOException {
+ return new TimeLimitedTermDocs(super.termDocs(term));
+ }
+
+ @Override
+ public TermPositions termPositions() throws IOException {
+ return new TimeLimitedTermPositions(super.termPositions());
+ }
+
+ @Override
+ public TermPositions termPositions(Term term) throws IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ return new TimeLimitedTermPositions(super.termPositions(term));
+ }
+
+ @Override
+ public TermEnum terms() throws IOException {
+ return new TimeLimitedTermEnum(super.terms());
+ }
+
+ @Override
+ public TermEnum terms(Term t) throws IOException {
+ return new TimeLimitedTermEnum(super.terms(t));
+ }
+
+}
Index: src/java/org/apache/lucene/util/ActivityTimedOutException.java
===================================================================
--- src/java/org/apache/lucene/util/ActivityTimedOutException.java (revision 0)
+++ src/java/org/apache/lucene/util/ActivityTimedOutException.java (revision 0)
@@ -0,0 +1,33 @@
+package org.apache.lucene.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class ActivityTimedOutException extends RuntimeException {
+
+ private long overrunMilliseconds;
+
+ public ActivityTimedOutException(String msg, long overrun) {
+ super(msg);
+ overrunMilliseconds = overrun;
+ }
+
+ public long getOverrunMilliseconds() {
+ return overrunMilliseconds;
+ }
+
+}
Index: src/java/org/apache/lucene/util/ActivityTimeMonitor.java
===================================================================
--- src/java/org/apache/lucene/util/ActivityTimeMonitor.java (revision 0)
+++ src/java/org/apache/lucene/util/ActivityTimeMonitor.java (revision 0)
@@ -0,0 +1,269 @@
+package org.apache.lucene.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.WeakHashMap;
+import java.util.Map.Entry;
+
+/**
+ * Utility class which efficiently monitors several threads performing
+ * time-limited activities. Client threads call {@link #start(long)} and
+ * {@link #stop()} to denote start and end of time-limited activity and calls to
+ * {@link #checkForTimeout()} will throw an {@link ActivityTimedOutException} if
+ * that thread has exceeded the max allowed time to run defined in
+ * {@link #start(long) start}.
+ *
+ * Any class can call {@link #checkForTimeout()} making it easy for objects
+ * shared by threads to simply check if the current thread is over-running in
+ * its activity. The call is intended to be very lightweight (no
+ * synchronisation) so it can be called in tight loops.
+ */
+public class ActivityTimeMonitor {
+
+ /** Monitors the current timeouts and signals when a thread has timed out. */
+ private static final class TimeoutThread extends Thread {
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ synchronized (timeLimitedThreads) {
+ long now = System.currentTimeMillis();
+ long waitTime = nextAnticipatedTimeout - now;
+ if (waitTime <= 0) {
+ // Something may have timed out - check all threads, adding
+ // all currently timed out threads to timedOutThreads
+ long newFirstAnticipatedTimeout = Long.MAX_VALUE;
+ boolean needToRemoveThread = false;
+ for (Entry timeout : timeLimitedThreads.entrySet()) {
+ long timeoutTime = timeout.getValue().scheduledTimeout;
+ if (timeoutTime <= now) {
+ // we cannot remove a thread while iterating on the map, so
+ // signal that a thread should be removed later.
+ needToRemoveThread = true;
+ // add to list of bad threads
+ timedOutThreads.put(timeout.getKey(), timeout.getValue());
+ anActivityHasTimedOut = true;
+ } else {
+ // assess time of next potential failure
+ newFirstAnticipatedTimeout = Math.min(newFirstAnticipatedTimeout, timeoutTime);
+ }
+ }
+ if (needToRemoveThread) {
+ for (Thread t : timedOutThreads.keySet()) {
+ timeLimitedThreads.remove(t);
+ }
+ }
+ nextAnticipatedTimeout = newFirstAnticipatedTimeout;
+ timeLimitedThreads.wait();
+ } else {
+ // Sleep until the next anticipated time of failure (or, hopefully
+ // more likely) until notify-ed of a new next anticipated time of
+ // failure
+ timeLimitedThreads.wait(waitTime);
+ }
+ }
+
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ /** The earliest timeout we can anticipate */
+ static private volatile long nextAnticipatedTimeout = Long.MAX_VALUE;
+
+ /** Set of threads that are known to have overrun */
+ static private WeakHashMap timedOutThreads = new WeakHashMap();
+
+ /** List of active threads that have called start(maxTimeMilliseconds) */
+ static private WeakHashMap timeLimitedThreads = new WeakHashMap();
+
+ /** an internal thread that monitors timeout activity */
+ private static TimeoutThread timeoutThread;
+
+ static public volatile boolean anActivityHasTimedOut;
+
+ static {
+ timeoutThread = new TimeoutThread();
+ timeoutThread.setDaemon(true);
+ timeoutThread.start();
+ }
+
+ /** Checks whether the current thread has timed out. */
+ private static void checkIsThreadTimeout() {
+ Thread currentThread = Thread.currentThread();
+ synchronized (timeLimitedThreads) {
+ ActivityTime thisTimeOut = timedOutThreads.remove(currentThread);
+ if (thisTimeOut != null) {
+ if (timedOutThreads.size() == 0) {
+ // All errors reported - reset the volatile variable that will be
+ // signalling error state
+ anActivityHasTimedOut = false;
+ }
+ long overrun = System.currentTimeMillis() - thisTimeOut.scheduledTimeout;
+
+ throw new ActivityTimedOutException("Thread " + currentThread
+ + " has timed out, overrun =" + overrun + " ms", overrun);
+ }
+ }
+ }
+
+ /**
+ * Checks to see if this thread is likely to exceed it's pre-determined timeout.
+ * This is a heavier-weight call than "checkForTimeout" and should not be called quite as frequently
+ *
+ * @param progress a value between 0 and 1 indicating percent progress
+ * @return true if the calling thread is likely to exceed the timeout threshold passed to the start method
+ * given the % progress made so far
+ */
+ public static final boolean isProjectedToTimeout(float progress) {
+ Thread currentThread=Thread.currentThread();
+ synchronized(timeLimitedThreads){
+ ActivityTime thisTimeOut = timeLimitedThreads.get(currentThread);
+ if(thisTimeOut!=null ){
+ long now=System.currentTimeMillis();
+ long maxDuration=thisTimeOut.scheduledTimeout-thisTimeOut.startTime;
+ long durationSoFar=now-thisTimeOut.startTime;
+ float expectedMinimumProgress=(float)durationSoFar/maxDuration;
+ return progressMUST ALWAYS be called after calling this method, usually in a finally block:
+ *
+ * ActivityTimeMonitor.start(100);
+ * try {
+ * ... do some activity ...
+ * } finally {
+ * ActivityTimeMonitor.stop();
+ * }
+ *
+ *
+ * @param maxTimeMilliseconds
+ * specifies the maximum length of time that this thread is permitted
+ * to execute a task.
+ */
+ public static void start(long maxTimeMilliseconds) {
+ long now=System.currentTimeMillis();
+ long scheduledTimeout = now + maxTimeMilliseconds;
+ Thread currentThread = Thread.currentThread();
+ synchronized (timeLimitedThreads) {
+ // store the scheduled point in time when the current thread should fail
+ timeLimitedThreads.put(currentThread, new ActivityTime(now,scheduledTimeout));
+
+ // if we are not in the middle of handling a timeout
+ if (!anActivityHasTimedOut) {
+ // check to see if this is now the first thread expected to time out...
+ if (scheduledTimeout < nextAnticipatedTimeout) {
+ nextAnticipatedTimeout = scheduledTimeout;
+
+ // Reset TimeOutThread with new earlier, more agressive deadline to on
+ // which to wait
+ timeLimitedThreads.notify();
+ }
+ }
+ }
+ }
+
+ /**
+ * Called by a client thread to signal that monitoring an activity should be
+ * stopped.
+ *
+ * @see #start(long)
+ */
+ public static void stop() {
+ Thread currentThread = Thread.currentThread();
+ synchronized (timeLimitedThreads) {
+
+ ActivityTime thisTimeOut = timeLimitedThreads.remove(currentThread);
+ // Choosing not to throw an exception if thread has timed out - we don't
+ // punish overruns at last stage
+ // but I guess that could be an option if you were feeling malicious
+ if (timedOutThreads.remove(currentThread) != null) {
+ if (timedOutThreads.size() == 0) {
+ // All errors reported - reset volatile variable
+ anActivityHasTimedOut = false;
+ }
+ }
+
+ if (thisTimeOut != null) {
+ // This thread may have timed out and been removed from timeLimitedThreads
+ if (thisTimeOut.scheduledTimeout <= nextAnticipatedTimeout) {
+ // this was the first thread expected to timeout -
+ // resetFirstAnticipatedFailure
+ findNextAnticipatedFailure();
+ }
+ }
+ }
+ }
+
+ static class ActivityTime {
+ long startTime;
+ long scheduledTimeout;
+ public ActivityTime(long startTime, long timeOutTime) {
+ this.startTime=startTime;
+ this.scheduledTimeout=timeOutTime;
+ }
+ }
+
+}
Index: src/test/org/apache/lucene/index/TestIndexReader.java
===================================================================
--- src/test/org/apache/lucene/index/TestIndexReader.java (revision 908992)
+++ src/test/org/apache/lucene/index/TestIndexReader.java (working copy)
@@ -71,6 +71,12 @@
super(name);
}
+ //Allow subclasses to override reader class under test
+ public IndexReader getReader(Directory dir,boolean readOnly) throws CorruptIndexException, IOException
+ {
+ return IndexReader.open(dir, readOnly);
+ }
+
public void testCommitUserData() throws Exception {
RAMDirectory d = new MockRAMDirectory();
@@ -84,14 +90,14 @@
addDocumentWithFields(writer);
writer.close();
- IndexReader r = IndexReader.open(d, false);
+ IndexReader r = getReader(d, false);
r.deleteDocument(5);
r.flush(commitUserData);
r.close();
SegmentInfos sis = new SegmentInfos();
sis.read(d);
- IndexReader r2 = IndexReader.open(d, false);
+ IndexReader r2 = getReader(d, false);
IndexCommit c = r.getIndexCommit();
assertEquals(c.getUserData(), commitUserData);
@@ -129,7 +135,7 @@
addDocumentWithFields(writer);
writer.close();
// set up reader:
- IndexReader reader = IndexReader.open(d, false);
+ IndexReader reader = getReader(d, false);
assertTrue(reader.isCurrent());
// modify index by adding another document:
writer = new IndexWriter(d, new StandardAnalyzer(TEST_VERSION_CURRENT), false, IndexWriter.MaxFieldLength.LIMITED);
@@ -157,7 +163,7 @@
addDocumentWithFields(writer);
writer.close();
// set up reader
- IndexReader reader = IndexReader.open(d, false);
+ IndexReader reader = getReader(d, false);
Collection fieldNames = reader.getFieldNames(IndexReader.FieldOption.ALL);
assertTrue(fieldNames.contains("keyword"));
assertTrue(fieldNames.contains("text"));
@@ -184,7 +190,7 @@
writer.close();
// verify fields again
- reader = IndexReader.open(d, false);
+ reader = getReader(d, false);
fieldNames = reader.getFieldNames(IndexReader.FieldOption.ALL);
assertEquals(13, fieldNames.size()); // the following fields
assertTrue(fieldNames.contains("keyword"));
@@ -259,7 +265,7 @@
writer.addDocument(doc);
}
writer.close();
- IndexReader reader = IndexReader.open(d, false);
+ IndexReader reader = getReader(d, false);
FieldSortedTermVectorMapper mapper = new FieldSortedTermVectorMapper(new TermVectorEntryFreqSortedComparator());
reader.getTermFreqVector(0, mapper);
Map> map = mapper.getFieldToTerms();
@@ -322,14 +328,14 @@
// OPEN READER AT THIS POINT - this should fix the view of the
// index at the point of having 100 "aaa" documents and 0 "bbb"
- reader = IndexReader.open(dir, false);
+ reader = getReader(dir, false);
assertEquals("first docFreq", 100, reader.docFreq(searchTerm));
assertTermDocsCount("first reader", reader, searchTerm, 100);
reader.close();
// DELETE DOCUMENTS CONTAINING TERM: aaa
int deleted = 0;
- reader = IndexReader.open(dir, false);
+ reader = getReader(dir, false);
deleted = reader.deleteDocuments(searchTerm);
assertEquals("deleted count", 100, deleted);
assertEquals("deleted docFreq", 100, reader.docFreq(searchTerm));
@@ -338,11 +344,11 @@
// open a 2nd reader to make sure first reader can
// commit its changes (.del) while second reader
// is open:
- IndexReader reader2 = IndexReader.open(dir, false);
+ IndexReader reader2 = getReader(dir, false);
reader.close();
// CREATE A NEW READER and re-test
- reader = IndexReader.open(dir, false);
+ reader = getReader(dir, false);
assertEquals("deleted docFreq", 100, reader.docFreq(searchTerm));
assertTermDocsCount("deleted termDocs", reader, searchTerm, 0);
reader.close();
@@ -370,7 +376,7 @@
doc.add(new Field("junk", "junk text", Field.Store.NO, Field.Index.ANALYZED));
writer.addDocument(doc);
writer.close();
- IndexReader reader = IndexReader.open(dir, false);
+ IndexReader reader = getReader(dir, false);
doc = reader.document(reader.maxDoc() - 1);
Field[] fields = doc.getFields("bin1");
assertNotNull(fields);
@@ -404,7 +410,7 @@
writer = new IndexWriter(dir, new WhitespaceAnalyzer(TEST_VERSION_CURRENT), false, IndexWriter.MaxFieldLength.LIMITED);
writer.optimize();
writer.close();
- reader = IndexReader.open(dir, false);
+ reader = getReader(dir, false);
doc = reader.document(reader.maxDoc() - 1);
fields = doc.getFields("bin1");
assertNotNull(fields);
@@ -437,7 +443,7 @@
}
writer.close();
- reader = IndexReader.open(dir, false);
+ reader = getReader(dir, false);
// Close reader:
reader.close();
@@ -482,7 +488,7 @@
}
// Create reader:
- reader = IndexReader.open(dir, false);
+ reader = getReader(dir, false);
// Try to make changes
try {
@@ -529,7 +535,7 @@
writer.close();
// now open reader & set norm for doc 0
- reader = IndexReader.open(dir, false);
+ reader = getReader(dir, false);
reader.setNorm(0, "content", (float) 2.0);
// we should be holding the write lock now:
@@ -541,7 +547,7 @@
assertTrue("not locked", !IndexWriter.isLocked(dir));
// open a 2nd reader:
- IndexReader reader2 = IndexReader.open(dir, false);
+ IndexReader reader2 = getReader(dir, false);
// set norm again for doc 0
reader.setNorm(0, "content", (float) 3.0);
@@ -576,12 +582,12 @@
// now open reader & set norm for doc 0 (writes to
// _0_1.s0)
- reader = IndexReader.open(dir, false);
+ reader = getReader(dir, false);
reader.setNorm(0, "content", (float) 2.0);
reader.close();
// now open reader again & set norm for doc 0 (writes to _0_2.s0)
- reader = IndexReader.open(dir, false);
+ reader = getReader(dir, false);
reader.setNorm(0, "content", (float) 2.0);
reader.close();
assertFalse("failed to remove first generation norms file on writing second generation",
@@ -602,7 +608,7 @@
fileDirName.mkdir();
}
try {
- IndexReader.open(fileDirName);
+ getReader(fileDirName);
fail("opening IndexReader on empty directory failed to produce FileNotFoundException");
} catch (FileNotFoundException e) {
// GOOD
@@ -632,7 +638,7 @@
// OPEN READER AT THIS POINT - this should fix the view of the
// index at the point of having 100 "aaa" documents and 0 "bbb"
- IndexReader reader = IndexReader.open(dir, false);
+ IndexReader reader = getReader(dir, false);
assertEquals("first docFreq", 100, reader.docFreq(searchTerm));
assertEquals("first docFreq", 0, reader.docFreq(searchTerm2));
assertTermDocsCount("first reader", reader, searchTerm, 100);
@@ -674,7 +680,7 @@
// Re-open index reader and try again. This time it should see
// the new data.
reader.close();
- reader = IndexReader.open(dir, false);
+ reader = getReader(dir, false);
assertEquals("first docFreq", 100, reader.docFreq(searchTerm));
assertEquals("first docFreq", 100, reader.docFreq(searchTerm2));
assertTermDocsCount("first reader", reader, searchTerm, 100);
@@ -689,7 +695,7 @@
reader.close();
// CREATE A NEW READER and re-test
- reader = IndexReader.open(dir, false);
+ reader = getReader(dir, false);
assertEquals("deleted docFreq", 100, reader.docFreq(searchTerm));
assertEquals("deleted docFreq", 100, reader.docFreq(searchTerm2));
assertTermDocsCount("deleted termDocs", reader, searchTerm, 0);
@@ -723,7 +729,7 @@
// Now open existing directory and test that reader closes all files
dir = getDirectory();
- IndexReader reader1 = IndexReader.open(dir, false);
+ IndexReader reader1 = getReader(dir, false);
reader1.close();
dir.close();
@@ -747,7 +753,7 @@
assertTrue(IndexWriter.isLocked(dir)); // writer open, so dir is locked
writer.close();
assertTrue(IndexReader.indexExists(dir));
- IndexReader reader = IndexReader.open(dir, false);
+ IndexReader reader = getReader(dir, false);
assertFalse(IndexWriter.isLocked(dir)); // reader only, no lock
long version = IndexReader.lastModified(dir);
if (i == 1) {
@@ -762,7 +768,7 @@
writer = new IndexWriter(dir, new WhitespaceAnalyzer(TEST_VERSION_CURRENT), true, IndexWriter.MaxFieldLength.LIMITED);
addDocumentWithFields(writer);
writer.close();
- reader = IndexReader.open(dir, false);
+ reader = getReader(dir, false);
assertTrue("old lastModified is " + version + "; new lastModified is " + IndexReader.lastModified(dir), version <= IndexReader.lastModified(dir));
reader.close();
dir.close();
@@ -781,7 +787,7 @@
assertTrue(IndexWriter.isLocked(dir)); // writer open, so dir is locked
writer.close();
assertTrue(IndexReader.indexExists(dir));
- IndexReader reader = IndexReader.open(dir, false);
+ IndexReader reader = getReader(dir, false);
assertFalse(IndexWriter.isLocked(dir)); // reader only, no lock
long version = IndexReader.getCurrentVersion(dir);
reader.close();
@@ -790,7 +796,7 @@
writer = new IndexWriter(dir, new WhitespaceAnalyzer(TEST_VERSION_CURRENT), true, IndexWriter.MaxFieldLength.LIMITED);
addDocumentWithFields(writer);
writer.close();
- reader = IndexReader.open(dir, false);
+ reader = getReader(dir, false);
assertTrue("old version is " + version + "; new version is " + IndexReader.getCurrentVersion(dir), version < IndexReader.getCurrentVersion(dir));
reader.close();
dir.close();
@@ -802,7 +808,7 @@
addDocumentWithFields(writer);
writer.close();
writer = new IndexWriter(dir, new WhitespaceAnalyzer(TEST_VERSION_CURRENT), false, IndexWriter.MaxFieldLength.LIMITED);
- IndexReader reader = IndexReader.open(dir, false);
+ IndexReader reader = getReader(dir, false);
try {
reader.deleteDocument(0);
fail("expected lock");
@@ -822,12 +828,12 @@
addDocumentWithFields(writer);
addDocumentWithFields(writer);
writer.close();
- IndexReader reader = IndexReader.open(dir, false);
+ IndexReader reader = getReader(dir, false);
reader.deleteDocument(0);
reader.deleteDocument(1);
reader.undeleteAll();
reader.close();
- reader = IndexReader.open(dir, false);
+ reader = getReader(dir, false);
assertEquals(2, reader.numDocs()); // nothing has really been deleted thanks to undeleteAll()
reader.close();
dir.close();
@@ -839,11 +845,11 @@
addDocumentWithFields(writer);
addDocumentWithFields(writer);
writer.close();
- IndexReader reader = IndexReader.open(dir, false);
+ IndexReader reader = getReader(dir, false);
reader.deleteDocument(0);
reader.deleteDocument(1);
reader.close();
- reader = IndexReader.open(dir, false);
+ reader = getReader(dir, false);
reader.undeleteAll();
assertEquals(2, reader.numDocs()); // nothing has really been deleted thanks to undeleteAll()
reader.close();
@@ -856,14 +862,14 @@
addDocumentWithFields(writer);
addDocumentWithFields(writer);
writer.close();
- IndexReader reader = IndexReader.open(dir, false);
+ IndexReader reader = getReader(dir, false);
reader.deleteDocument(0);
reader.deleteDocument(1);
reader.close();
- reader = IndexReader.open(dir, false);
+ reader = getReader(dir, false);
reader.undeleteAll();
reader.close();
- reader = IndexReader.open(dir, false);
+ reader = getReader(dir, false);
assertEquals(2, reader.numDocs()); // nothing has really been deleted thanks to undeleteAll()
reader.close();
dir.close();
@@ -914,7 +920,7 @@
// the same files again.
dir.setPreventDoubleWrite(false);
- IndexReader reader = IndexReader.open(dir, false);
+ IndexReader reader = getReader(dir, false);
// For each disk size, first try to commit against
// dir that will hit random IOExceptions & disk
@@ -1018,7 +1024,7 @@
// changed (transactional semantics):
IndexReader newReader = null;
try {
- newReader = IndexReader.open(dir, false);
+ newReader = getReader(dir, false);
} catch (IOException e) {
e.printStackTrace();
fail(testName + ":exception when creating IndexReader after disk full during close: " + e);
@@ -1085,7 +1091,7 @@
addDoc(writer, "aaa");
}
writer.close();
- IndexReader reader = IndexReader.open(dir, false);
+ IndexReader reader = getReader(dir, false);
// Try to delete an invalid docId, yet, within range
// of the final bits of the BitVector:
@@ -1124,7 +1130,7 @@
addDoc(writer, "aaa");
writer.close();
- IndexReader reader = IndexReader.open(dir, false);
+ IndexReader reader = getReader(dir, false);
try {
reader.deleteDocument(1);
fail("did not hit exception when deleting an invalid doc number");
@@ -1136,7 +1142,7 @@
fail("write lock is still held after close");
}
- reader = IndexReader.open(dir, false);
+ reader = getReader(dir, false);
try {
reader.setNorm(1, "content", (float) 2.0);
fail("did not hit exception when calling setNorm on an invalid doc number");
@@ -1166,7 +1172,7 @@
"deletetest");
Directory dir = FSDirectory.open(dirFile);
try {
- IndexReader.open(dir, false);
+ getReader(dir, false);
fail("expected FileNotFoundException");
} catch (FileNotFoundException e) {
// expected
@@ -1176,7 +1182,7 @@
// Make sure we still get a CorruptIndexException (not NPE):
try {
- IndexReader.open(dir, false);
+ getReader(dir, false);
fail("expected FileNotFoundException");
} catch (FileNotFoundException e) {
// expected
@@ -1209,7 +1215,7 @@
// OPEN TWO READERS
// Both readers get segment info as exists at this time
- IndexReader reader1 = IndexReader.open(dir, false);
+ IndexReader reader1 = getReader(dir, false);
assertEquals("first opened", 100, reader1.docFreq(searchTerm1));
assertEquals("first opened", 100, reader1.docFreq(searchTerm2));
assertEquals("first opened", 100, reader1.docFreq(searchTerm3));
@@ -1217,7 +1223,7 @@
assertTermDocsCount("first opened", reader1, searchTerm2, 100);
assertTermDocsCount("first opened", reader1, searchTerm3, 100);
- IndexReader reader2 = IndexReader.open(dir, false);
+ IndexReader reader2 = getReader(dir, false);
assertEquals("first opened", 100, reader2.docFreq(searchTerm1));
assertEquals("first opened", 100, reader2.docFreq(searchTerm2));
assertEquals("first opened", 100, reader2.docFreq(searchTerm3));
@@ -1258,7 +1264,7 @@
// RECREATE READER AND TRY AGAIN
reader1.close();
- reader1 = IndexReader.open(dir, false);
+ reader1 = getReader(dir, false);
assertEquals("reopened", 100, reader1.docFreq(searchTerm1));
assertEquals("reopened", 100, reader1.docFreq(searchTerm2));
assertEquals("reopened", 100, reader1.docFreq(searchTerm3));
@@ -1276,7 +1282,7 @@
reader1.close();
// Open another reader to confirm that everything is deleted
- reader2 = IndexReader.open(dir, false);
+ reader2 = getReader(dir, false);
assertEquals("reopened 2", 100, reader2.docFreq(searchTerm1));
assertEquals("reopened 2", 100, reader2.docFreq(searchTerm2));
assertEquals("reopened 2", 100, reader2.docFreq(searchTerm3));
@@ -1428,7 +1434,7 @@
SegmentInfos sis = new SegmentInfos();
sis.read(d);
- IndexReader r = IndexReader.open(d, false);
+ IndexReader r = getReader(d, false);
IndexCommit c = r.getIndexCommit();
assertEquals(sis.getCurrentSegmentFileName(), c.getSegmentsFileName());
@@ -1467,7 +1473,7 @@
addDocumentWithFields(writer);
writer.close();
- IndexReader r = IndexReader.open(d, true);
+ IndexReader r = getReader(d, true);
try {
r.deleteDocument(0);
fail();
@@ -1526,12 +1532,12 @@
writer.addDocument(createDocument("b"));
writer.addDocument(createDocument("c"));
writer.close();
- IndexReader reader = IndexReader.open(dir, false);
+ IndexReader reader = getReader(dir, false);
reader.deleteDocuments(new Term("id", "a"));
reader.flush();
reader.deleteDocuments(new Term("id", "b"));
reader.close();
- IndexReader.open(dir,true).close();
+ getReader(dir,true).close();
}
// LUCENE-1647
@@ -1544,14 +1550,14 @@
writer.addDocument(createDocument("b"));
writer.addDocument(createDocument("c"));
writer.close();
- IndexReader reader = IndexReader.open(dir, false);
+ IndexReader reader = getReader(dir, false);
reader.deleteDocuments(new Term("id", "a"));
reader.flush();
reader.deleteDocuments(new Term("id", "b"));
reader.undeleteAll();
reader.deleteDocuments(new Term("id", "b"));
reader.close();
- IndexReader.open(dir,true).close();
+ getReader(dir,true).close();
dir.close();
}
@@ -1567,7 +1573,7 @@
public void testNoDir() throws Throwable {
Directory dir = FSDirectory.open(_TestUtil.getTempDir("doesnotexist"));
try {
- IndexReader.open(dir, true);
+ getReader(dir, true);
fail("did not hit expected exception");
} catch (NoSuchDirectoryException nsde) {
// expected
@@ -1644,7 +1650,7 @@
writer.commit();
// Open reader1
- IndexReader r = IndexReader.open(dir, false);
+ IndexReader r = getReader(dir, false);
IndexReader r1 = SegmentReader.getOnlySegmentReader(r);
final int[] ints = FieldCache.DEFAULT.getInts(r1, "number");
assertEquals(1, ints.length);
@@ -1676,7 +1682,7 @@
writer.commit();
// Open reader1
- IndexReader r = IndexReader.open(dir, false);
+ IndexReader r = getReader(dir, false);
assertTrue(r instanceof DirectoryReader);
IndexReader r1 = SegmentReader.getOnlySegmentReader(r);
final int[] ints = FieldCache.DEFAULT.getInts(r1, "number");
@@ -1718,7 +1724,7 @@
writer.addDocument(doc);
writer.commit();
- IndexReader r = IndexReader.open(dir, false);
+ IndexReader r = getReader(dir, false);
IndexReader r1 = SegmentReader.getOnlySegmentReader(r);
assertEquals(36, r1.getUniqueTermCount());
writer.addDocument(doc);
@@ -1783,7 +1789,7 @@
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(TEST_VERSION_CURRENT), IndexWriter.MaxFieldLength.UNLIMITED);
Document doc = new Document();
writer.addDocument(doc);
- IndexReader r = IndexReader.open(dir, true);
+ IndexReader r = getReader(dir, true);
assertTrue(r.isCurrent());
writer.addDocument(doc);
writer.prepareCommit();
Index: src/test/org/apache/lucene/index/TestTimeLimitingIndexReader.java
===================================================================
--- src/test/org/apache/lucene/index/TestTimeLimitingIndexReader.java (revision 0)
+++ src/test/org/apache/lucene/index/TestTimeLimitingIndexReader.java (revision 0)
@@ -0,0 +1,19 @@
+package org.apache.lucene.index;
+
+import java.io.IOException;
+
+import org.apache.lucene.store.Directory;
+
+public class TestTimeLimitingIndexReader extends TestIndexReader{
+ public TestTimeLimitingIndexReader(String name) {
+ super(name);
+ }
+ @Override
+ public IndexReader getReader(Directory dir, boolean readOnly)
+ throws CorruptIndexException, IOException {
+ return new TimeLimitingIndexReader( super.getReader(dir, readOnly));
+ }
+
+ //TODO base class handles usual IndexReader behaviour and that we have wrapped correctly but
+ //need to add tests here related to succesful timing out of reader operations.
+}
Index: src/test/org/apache/lucene/index/TimeLimitingIndexReaderBenchmark.java
===================================================================
--- src/test/org/apache/lucene/index/TimeLimitingIndexReaderBenchmark.java (revision 0)
+++ src/test/org/apache/lucene/index/TimeLimitingIndexReaderBenchmark.java (revision 0)
@@ -0,0 +1,159 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.lucene.analysis.WhitespaceAnalyzer;
+import org.apache.lucene.queryParser.ParseException;
+import org.apache.lucene.queryParser.QueryParser;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.TimeLimitingCollector;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.util.ActivityTimeMonitor;
+import org.apache.lucene.util.Version;
+
+public class TimeLimitingIndexReaderBenchmark {
+
+ static class MyCollector extends Collector {
+ int numMatches = 0;
+
+ @Override
+ public boolean acceptsDocsOutOfOrder() { return false; }
+
+ @Override
+ public void collect(int doc) throws IOException { numMatches++; }
+
+ @Override
+ public void setNextReader(IndexReader reader, int docBase)
+ throws IOException {}
+
+ @Override
+ public void setScorer(Scorer scorer) throws IOException {}
+ }
+
+ private static void runHeavyTermEnumTermDocsAccess(IndexReader r, String msg)
+ throws IOException {
+ long start = System.currentTimeMillis();
+ TermEnum te = r.terms();
+ int numTerms = 0;
+ while (te.next()) {
+ TermDocs td = r.termDocs(te.term());
+ while (td.next()) {}
+ numTerms++;
+ if (numTerms > 200000) {
+ break;
+ }
+
+ }
+ long diff = System.currentTimeMillis() - start;
+ System.out.println("Read term docs for 200000 terms in " + diff
+ + " ms using " + msg);
+ }
+
+ private static void runQueryNoTimeoutProtection(Query q, IndexReader r)
+ throws IOException, ParseException {
+ long start = System.currentTimeMillis();
+ IndexSearcher s = new IndexSearcher(r);
+
+ MyCollector hc = new MyCollector();
+ s.search(q, hc);
+ long diff = System.currentTimeMillis() - start;
+ System.out.println(q + " no time limit matched " + hc.numMatches
+ + " docs in \t" + diff + " ms");
+ }
+
+ private static void runQueryTimeLimitedCollector(Query q, IndexReader r,
+ int maxWait) throws IOException, ParseException {
+ long start = System.currentTimeMillis();
+ IndexSearcher s = new IndexSearcher(r);
+ MyCollector hc = new MyCollector();
+ s.search(q, new TimeLimitingCollector(hc, maxWait));
+ long diff = System.currentTimeMillis() - start;
+ System.out.println(q + " time limited collector matched " + hc.numMatches
+ + " docs in \t" + diff + " ms");
+ }
+
+ private static void runQueryTimeLimitedReader(Query q, IndexReader r)
+ throws IOException, ParseException {
+ long start = System.currentTimeMillis();
+ IndexSearcher s = new IndexSearcher(r);
+ MyCollector hc = new MyCollector();
+ s.search(q, hc);
+ long diff = System.currentTimeMillis() - start;
+ System.out.println(q + " time limited reader matched " + hc.numMatches
+ + " docs in \t" + diff + " ms");
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ IndexReader r = IndexReader.open(FSDirectory.open(new File(
+ "D:/indexes/wikipedia")), true);
+ // IndexReader
+ // r=IndexReader.open("/Users/Mark/Work/indexes/wikipediaNov2008");
+ TimeLimitingIndexReader tlir = new TimeLimitingIndexReader(r);
+
+ // Warmup
+ runHeavyTermEnumTermDocsAccess(r, "no timeout limit (warm up)");
+ runHeavyTermEnumTermDocsAccess(r, "no timeout limit (warm up)");
+
+ runHeavyTermEnumTermDocsAccess(r, "no timeout limit");
+ ActivityTimeMonitor.start(30000);
+ try {
+ runHeavyTermEnumTermDocsAccess(tlir, " reader-limited access");
+ } finally {
+ ActivityTimeMonitor.stop();
+ }
+
+ // ======================
+ int maxTimeForFuzzyQueryFromHell = 10000;
+ QueryParser qp = new QueryParser(Version.LUCENE_CURRENT, "text",
+ new WhitespaceAnalyzer(Version.LUCENE_CURRENT));
+ Query q = qp.parse("f* AND a* AND b*");
+
+ runQueryNoTimeoutProtection(q, r);
+ runQueryTimeLimitedCollector(q, r, maxTimeForFuzzyQueryFromHell);
+ ActivityTimeMonitor.start(maxTimeForFuzzyQueryFromHell);
+ try {
+ runQueryTimeLimitedReader(q, tlir);
+ } finally {
+ ActivityTimeMonitor.stop();
+ }
+ System.out.println();
+
+ q = qp.parse("accomodation~");
+ runQueryNoTimeoutProtection(q, r);
+ // can timeout significantly later than desired because cost is in fuzzy
+ // analysis, not collection
+ runQueryTimeLimitedCollector(q, r, maxTimeForFuzzyQueryFromHell);
+
+ ActivityTimeMonitor.start(maxTimeForFuzzyQueryFromHell);
+ try {
+ // Times out much more readily
+ runQueryTimeLimitedReader(q, tlir);
+ } finally {
+ ActivityTimeMonitor.stop();
+ }
+
+ }
+
+}
Index: src/test/org/apache/lucene/util/TestActivityTimeMonitor.java
===================================================================
--- src/test/org/apache/lucene/util/TestActivityTimeMonitor.java (revision 0)
+++ src/test/org/apache/lucene/util/TestActivityTimeMonitor.java (revision 0)
@@ -0,0 +1,117 @@
+package org.apache.lucene.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class TestActivityTimeMonitor {
+
+ private static final class Activity {
+ public void doSomething() {
+ ActivityTimeMonitor.checkForTimeout();
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ private static final class ActivityThread extends Thread {
+ public boolean timeoutSignalled = false;
+ @Override
+ public void run() {
+ Activity act = new Activity();
+ ActivityTimeMonitor.start(100);
+ try {
+ try {
+ sleep(200); // sleep long enough to timeout
+ } catch (InterruptedException e) {
+ }
+ // This should throw ActivityTimedOutException.
+ act.doSomething();
+ } catch (ActivityTimedOutException e) {
+ timeoutSignalled = true;
+ } finally {
+ ActivityTimeMonitor.stop();
+ }
+ }
+ }
+
+ @Test(expected=ActivityTimedOutException.class)
+ public void testTimedOutException() throws Exception {
+ Activity act = new Activity();
+ ActivityTimeMonitor.start(100);
+ try {
+ Thread.sleep(200); // sleep long enough to timeout
+ // This should throw ActivityTimedOutException. If it won't the test will
+ // fail.
+ act.doSomething();
+ } finally {
+ ActivityTimeMonitor.stop();
+ }
+ }
+
+ @Test
+ public void testMultipleThreadsFailing() throws Exception {
+ ActivityThread[] threads = new ActivityThread[10];
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new ActivityThread();
+ threads[i].setName("thread-" + i);
+ }
+
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ for (ActivityThread thread: threads) {
+ assertTrue("thread " + thread.getName() + " failed", thread.timeoutSignalled);
+ }
+
+ }
+
+ @Test
+ public void testProjectedTimeoutTracking() throws Exception {
+ long start=System.currentTimeMillis();
+ int numCheckStages=3;
+ long activityDuration=500;
+ long accuracyBuffer=50;
+ long stageLength=activityDuration/numCheckStages;
+ float progress;
+ ActivityTimeMonitor.start(activityDuration+accuracyBuffer);
+ try {
+ for (int i = 0; i < numCheckStages; i++){
+ Thread.sleep(stageLength);
+ long now=System.currentTimeMillis();
+ long timeSoFar=Math.min(activityDuration, now-start);
+ progress=((float)timeSoFar)/(float)activityDuration;
+ assertFalse("Should have been flagged as on track "+progress,ActivityTimeMonitor.isProjectedToTimeout(progress));
+ assertTrue("Should have reported projected failure "+progress/2f, ActivityTimeMonitor.isProjectedToTimeout(progress/2f));
+ }
+ } finally {
+ ActivityTimeMonitor.stop();
+ }
+ }
+
+}