Index: src/java/org/apache/lucene/index/ActivityTimeMonitor.java =================================================================== --- src/java/org/apache/lucene/index/ActivityTimeMonitor.java (revision 0) +++ src/java/org/apache/lucene/index/ActivityTimeMonitor.java (revision 0) @@ -0,0 +1,252 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.WeakHashMap; +import java.util.Map.Entry; + +/** + * Utility class which efficiently monitors several threads performing + * time-limited activities. Client threads call {@link #start(long)} and + * {@link #stop()} to denote start and end of time-limited activity and calls to + * {@link #checkForTimeout()} will throw an {@link ActivityTimedOutException} if + * that thread has exceeded the max allowed time to run defined in + * {@link #start(long) start}. + *
+ * Any class can call {@link #checkForTimeout()} making it easy for objects
+ * shared by threads to simply check if the current thread is over-running in
+ * its activity. The call is intended to be very lightweight (no
+ * synchronisation) so it can be called in tight loops.
+ */
+public class ActivityTimeMonitor {
+
+ /** Monitors the current timeouts and signals when a thread has timed out. */
+ private static final class TimeoutThread extends Thread {
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ synchronized (timeLimitedThreads) {
+ long now = System.currentTimeMillis();
+ long waitTime = nextAnticipatedTimeout - now;
+ if (waitTime <= 0) {
+ // Something may have timed out - check all threads, adding
+ // all currently timed out threads to timedOutThreads
+ long newFirstAnticipatedTimeout = Long.MAX_VALUE;
+ boolean needToRemoveThread = false;
+ for (Entry
+ *
+ * @param maxTimeMilliseconds
+ * specifies the maximum length of time that this thread is permitted
+ * to execute a task.
+ */
+ public static void start(long maxTimeMilliseconds) {
+ long scheduledTimeout = System.currentTimeMillis() + maxTimeMilliseconds;
+ Thread currentThread = Thread.currentThread();
+ synchronized (timeLimitedThreads) {
+ // store the scheduled point in time when the current thread should fail
+ timeLimitedThreads.put(currentThread, new Long(scheduledTimeout));
+
+ // if we are not in the middle of handling a timeout
+ if (!anActivityHasTimedOut) {
+ // check to see if this is now the first thread expected to time out...
+ if (scheduledTimeout < nextAnticipatedTimeout) {
+ nextAnticipatedTimeout = scheduledTimeout;
+
+ // Reset TimeOutThread with new earlier, more agressive deadline to on
+ // which to wait
+ timeLimitedThreads.notify();
+ }
+ }
+ }
+ }
+
+ /**
+ * Called by a client thread to signal that monitoring an activity should be
+ * stopped.
+ *
+ * @see #start(long)
+ */
+ public static void stop() {
+ Thread currentThread = Thread.currentThread();
+ synchronized (timeLimitedThreads) {
+
+ Long thisTimeOut = timeLimitedThreads.remove(currentThread);
+ // Choosing not to throw an exception if thread has timed out - we don't
+ // punish overruns at last stage
+ // but I guess that could be an option if you were feeling malicious
+ if (timedOutThreads.remove(currentThread) != null) {
+ if (timedOutThreads.size() == 0) {
+ // All errors reported - reset volatile variable
+ anActivityHasTimedOut = false;
+ }
+ }
+
+ if (thisTimeOut != null) {
+ // This thread may have timed out and been removed from timeLimitedThreads
+ if (thisTimeOut.longValue() <= nextAnticipatedTimeout) {
+ // this was the first thread expected to timeout -
+ // resetFirstAnticipatedFailure
+ findNextAnticipatedFailure();
+ }
+ }
+ }
+ }
+
+}
Property changes on: src\java\org\apache\lucene\index\ActivityTimeMonitor.java
___________________________________________________________________
Added: svn:keywords
+ Date Author Id Revision HeadURL
Added: svn:eol-style
+ native
Index: src/java/org/apache/lucene/index/ActivityTimedOutException.java
===================================================================
--- src/java/org/apache/lucene/index/ActivityTimedOutException.java (revision 0)
+++ src/java/org/apache/lucene/index/ActivityTimedOutException.java (revision 0)
@@ -0,0 +1,33 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class ActivityTimedOutException extends RuntimeException {
+
+ private long overrunMilliseconds;
+
+ public ActivityTimedOutException(String msg, long overrun) {
+ super(msg);
+ overrunMilliseconds = overrun;
+ }
+
+ public long getOverrunMilliseconds() {
+ return overrunMilliseconds;
+ }
+
+}
Property changes on: src\java\org\apache\lucene\index\ActivityTimedOutException.java
___________________________________________________________________
Added: svn:keywords
+ Date Author Id Revision HeadURL
Added: svn:eol-style
+ native
Index: src/java/org/apache/lucene/index/TimeLimitedIndexReader.java
===================================================================
--- src/java/org/apache/lucene/index/TimeLimitedIndexReader.java (revision 0)
+++ src/java/org/apache/lucene/index/TimeLimitedIndexReader.java (revision 0)
@@ -0,0 +1,222 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.FieldSelector;
+
+/**
+ * IndexReader that checks if client threads using this class are not
+ * over-running a maximum allotted time. Clients must use the
+ * {@link ActivityTimeMonitor} class's start and stop methods to define scope
+ * and this class wraps all calls to the underlying index with calls to
+ * ActivityTimeMonitor's checkForTimeout method which will throw a runtime
+ * exeption {@link ActivityTimedOutException} in the event of an activity
+ * over-run.
+ */
+public class TimeLimitedIndexReader extends FilterIndexReader {
+
+ // Term Docs with timedout safety checks
+ private static class TimeLimitedTermDocs implements TermDocs {
+
+ private TermDocs termDocs;
+
+ public TimeLimitedTermDocs(TermDocs termDocs) {
+ this.termDocs = termDocs;
+ }
+
+ public void close() throws IOException {
+ termDocs.close();
+ }
+
+ public int doc() {
+ ActivityTimeMonitor.checkForTimeout();
+ return termDocs.doc();
+ }
+
+ public int freq() {
+ ActivityTimeMonitor.checkForTimeout();
+ return termDocs.freq();
+ }
+
+ public boolean next() throws IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ return termDocs.next();
+ }
+
+ public int read(int[] docs, int[] freqs) throws IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ return termDocs.read(docs, freqs);
+ }
+
+ public void seek(Term term) throws IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ termDocs.seek(term);
+ }
+
+ public void seek(TermEnum termEnum) throws IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ termDocs.seek(termEnum);
+ }
+
+ public boolean skipTo(int target) throws IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ return termDocs.skipTo(target);
+ }
+ }
+
+ private static class TimeLimitedTermEnum extends TermEnum {
+
+ private TermEnum terms;
+
+ public TimeLimitedTermEnum(TermEnum terms) {
+ this.terms = terms;
+ }
+
+ @Override
+ public void close() throws IOException {
+ terms.close();
+ }
+
+ @Override
+ public int docFreq() {
+ ActivityTimeMonitor.checkForTimeout();
+ return terms.docFreq();
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ return terms.next();
+ }
+
+ @Override
+ public Term term() {
+ ActivityTimeMonitor.checkForTimeout();
+ return terms.term();
+ }
+
+ }
+
+ private static class TimeLimitedTermPositions extends TimeLimitedTermDocs
+ implements TermPositions {
+
+ private TermPositions tp;
+
+ public TimeLimitedTermPositions(TermPositions termPositions) {
+ super(termPositions);
+ this.tp = termPositions;
+ }
+
+ public byte[] getPayload(byte[] data, int offset) throws IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ return tp.getPayload(data, offset);
+ }
+
+ public int getPayloadLength() {
+ ActivityTimeMonitor.checkForTimeout();
+ return tp.getPayloadLength();
+ }
+
+ public boolean isPayloadAvailable() {
+ ActivityTimeMonitor.checkForTimeout();
+ return tp.isPayloadAvailable();
+ }
+
+ public int nextPosition() throws IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ return tp.nextPosition();
+ }
+ }
+
+ private boolean isSubReader;
+
+ public TimeLimitedIndexReader(IndexReader in) {
+ super(in);
+ }
+
+ public TimeLimitedIndexReader(IndexReader indexReader, boolean isSubReader) {
+ super(indexReader);
+ this.isSubReader = isSubReader;
+ }
+
+ @Override
+ public int docFreq(Term t) throws IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ return super.docFreq(t);
+ }
+
+ @Override
+ public Document document(int n) throws CorruptIndexException, IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ return super.document(n);
+ }
+
+ @Override
+ public Document document(int n, FieldSelector fieldSelector)
+ throws CorruptIndexException, IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ return super.document(n, fieldSelector);
+ }
+
+ @Override
+ public IndexReader[] getSequentialSubReaders() {
+ if (isSubReader)
+ return null;
+ IndexReader[] results = super.getSequentialSubReaders();
+ IndexReader[] tlResults = new IndexReader[results.length];
+ for (int i = 0; i < results.length; i++) {
+ tlResults[i] = new TimeLimitedIndexReader(results[i], true);
+ }
+ return tlResults;
+ }
+
+ @Override
+ public TermDocs termDocs() throws IOException {
+ return new TimeLimitedTermDocs(super.termDocs());
+ }
+
+ @Override
+ public TermDocs termDocs(Term term) throws IOException {
+ return new TimeLimitedTermDocs(super.termDocs(term));
+ }
+
+ @Override
+ public TermPositions termPositions() throws IOException {
+ return new TimeLimitedTermPositions(super.termPositions());
+ }
+
+ @Override
+ public TermPositions termPositions(Term term) throws IOException {
+ ActivityTimeMonitor.checkForTimeout();
+ return new TimeLimitedTermPositions(super.termPositions(term));
+ }
+
+ @Override
+ public TermEnum terms() throws IOException {
+ return new TimeLimitedTermEnum(super.terms());
+ }
+
+ @Override
+ public TermEnum terms(Term t) throws IOException {
+ return new TimeLimitedTermEnum(super.terms(t));
+ }
+
+}
Property changes on: src\java\org\apache\lucene\index\TimeLimitedIndexReader.java
___________________________________________________________________
Added: svn:keywords
+ Date Author Id Revision HeadURL
Added: svn:eol-style
+ native
Index: src/test/org/apache/lucene/index/TestActivityTimeMonitor.java
===================================================================
--- src/test/org/apache/lucene/index/TestActivityTimeMonitor.java (revision 0)
+++ src/test/org/apache/lucene/index/TestActivityTimeMonitor.java (revision 0)
@@ -0,0 +1,93 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class TestActivityTimeMonitor {
+
+ private static final class Activity {
+ public void doSomething() {
+ ActivityTimeMonitor.checkForTimeout();
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ private static final class ActivityThread extends Thread {
+ public boolean success = false;
+ @Override
+ public void run() {
+ Activity act = new Activity();
+ ActivityTimeMonitor.start(100);
+ try {
+ try {
+ sleep(200); // sleep long enough to timeout
+ } catch (InterruptedException e) {
+ }
+ // This should throw ActivityTimedOutException.
+ act.doSomething();
+ } catch (ActivityTimedOutException e) {
+ success = true;
+ } finally {
+ ActivityTimeMonitor.stop();
+ }
+ }
+ }
+
+ @Test(expected=ActivityTimedOutException.class)
+ public void testTimedOutException() throws Exception {
+ Activity act = new Activity();
+ ActivityTimeMonitor.start(100);
+ try {
+ Thread.sleep(200); // sleep long enough to timeout
+ // This should throw ActivityTimedOutException. If it won't the test will
+ // fail.
+ act.doSomething();
+ } finally {
+ ActivityTimeMonitor.stop();
+ }
+ }
+
+ @Test
+ public void testMultipleThreadsFailing() throws Exception {
+ ActivityThread[] threads = new ActivityThread[10];
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new ActivityThread();
+ threads[i].setName("thread-" + i);
+ }
+
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ for (ActivityThread thread: threads) {
+ assertTrue("thread " + thread.getName() + " failed", thread.success);
+ }
+
+ }
+
+}
Property changes on: src\test\org\apache\lucene\index\TestActivityTimeMonitor.java
___________________________________________________________________
Added: svn:keywords
+ Date Author Id Revision HeadURL
Added: svn:eol-style
+ native
Index: src/test/org/apache/lucene/index/TimeLimitedIndexReaderBenchmark.java
===================================================================
--- src/test/org/apache/lucene/index/TimeLimitedIndexReaderBenchmark.java (revision 0)
+++ src/test/org/apache/lucene/index/TimeLimitedIndexReaderBenchmark.java (revision 0)
@@ -0,0 +1,158 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.lucene.analysis.WhitespaceAnalyzer;
+import org.apache.lucene.queryParser.ParseException;
+import org.apache.lucene.queryParser.QueryParser;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.TimeLimitingCollector;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.util.Version;
+
+public class TimeLimitedIndexReaderBenchmark {
+
+ static class MyCollector extends Collector {
+ int numMatches = 0;
+
+ @Override
+ public boolean acceptsDocsOutOfOrder() { return false; }
+
+ @Override
+ public void collect(int doc) throws IOException { numMatches++; }
+
+ @Override
+ public void setNextReader(IndexReader reader, int docBase)
+ throws IOException {}
+
+ @Override
+ public void setScorer(Scorer scorer) throws IOException {}
+ }
+
+ private static void runHeavyTermEnumTermDocsAccess(IndexReader r, String msg)
+ throws IOException {
+ long start = System.currentTimeMillis();
+ TermEnum te = r.terms();
+ int numTerms = 0;
+ while (te.next()) {
+ TermDocs td = r.termDocs(te.term());
+ while (td.next()) {}
+ numTerms++;
+ if (numTerms > 200000) {
+ break;
+ }
+
+ }
+ long diff = System.currentTimeMillis() - start;
+ System.out.println("Read term docs for 200000 terms in " + diff
+ + " ms using " + msg);
+ }
+
+ private static void runQueryNoTimeoutProtection(Query q, IndexReader r)
+ throws IOException, ParseException {
+ long start = System.currentTimeMillis();
+ IndexSearcher s = new IndexSearcher(r);
+
+ MyCollector hc = new MyCollector();
+ s.search(q, hc);
+ long diff = System.currentTimeMillis() - start;
+ System.out.println(q + " no time limit matched " + hc.numMatches
+ + " docs in \t" + diff + " ms");
+ }
+
+ private static void runQueryTimeLimitedCollector(Query q, IndexReader r,
+ int maxWait) throws IOException, ParseException {
+ long start = System.currentTimeMillis();
+ IndexSearcher s = new IndexSearcher(r);
+ MyCollector hc = new MyCollector();
+ s.search(q, new TimeLimitingCollector(hc, maxWait));
+ long diff = System.currentTimeMillis() - start;
+ System.out.println(q + " time limited collector matched " + hc.numMatches
+ + " docs in \t" + diff + " ms");
+ }
+
+ private static void runQueryTimeLimitedReader(Query q, IndexReader r)
+ throws IOException, ParseException {
+ long start = System.currentTimeMillis();
+ IndexSearcher s = new IndexSearcher(r);
+ MyCollector hc = new MyCollector();
+ s.search(q, hc);
+ long diff = System.currentTimeMillis() - start;
+ System.out.println(q + " time limited reader matched " + hc.numMatches
+ + " docs in \t" + diff + " ms");
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ IndexReader r = IndexReader.open(FSDirectory.open(new File(
+ "D:/indexes/wikipedia")), true);
+ // IndexReader
+ // r=IndexReader.open("/Users/Mark/Work/indexes/wikipediaNov2008");
+ TimeLimitedIndexReader tlir = new TimeLimitedIndexReader(r);
+
+ // Warmup
+ runHeavyTermEnumTermDocsAccess(r, "no timeout limit (warm up)");
+ runHeavyTermEnumTermDocsAccess(r, "no timeout limit (warm up)");
+
+ runHeavyTermEnumTermDocsAccess(r, "no timeout limit");
+ ActivityTimeMonitor.start(30000);
+ try {
+ runHeavyTermEnumTermDocsAccess(tlir, " reader-limited access");
+ } finally {
+ ActivityTimeMonitor.stop();
+ }
+
+ // ======================
+ int maxTimeForFuzzyQueryFromHell = 4000;
+ QueryParser qp = new QueryParser(Version.LUCENE_CURRENT, "text",
+ new WhitespaceAnalyzer(Version.LUCENE_CURRENT));
+ Query q = qp.parse("f* AND a* AND b*");
+
+ runQueryNoTimeoutProtection(q, r);
+ runQueryTimeLimitedCollector(q, r, maxTimeForFuzzyQueryFromHell);
+ ActivityTimeMonitor.start(maxTimeForFuzzyQueryFromHell);
+ try {
+ runQueryTimeLimitedReader(q, tlir);
+ } finally {
+ ActivityTimeMonitor.stop();
+ }
+ System.out.println();
+
+ q = qp.parse("accomodation~");
+ runQueryNoTimeoutProtection(q, r);
+ // can timeout significantly later than desired because cost is in fuzzy
+ // analysis, not collection
+ runQueryTimeLimitedCollector(q, r, maxTimeForFuzzyQueryFromHell);
+
+ ActivityTimeMonitor.start(maxTimeForFuzzyQueryFromHell);
+ try {
+ // Times out much more readily
+ runQueryTimeLimitedReader(q, tlir);
+ } finally {
+ ActivityTimeMonitor.stop();
+ }
+
+ }
+
+}
Property changes on: src\test\org\apache\lucene\index\TimeLimitedIndexReaderBenchmark.java
___________________________________________________________________
Added: svn:keywords
+ Date Author Id Revision HeadURL
Added: svn:eol-style
+ native
+ * ActivityTimeMonitor.start(100);
+ * try {
+ * ... do some activity ...
+ * } finally {
+ * ActivityTimeMonitor.stop();
+ * }
+ *