Index: contrib/benchmark/CHANGES.txt
===================================================================
--- contrib/benchmark/CHANGES.txt (revision 835818)
+++ contrib/benchmark/CHANGES.txt (working copy)
@@ -4,6 +4,16 @@
$Id:$
+11/13/2009
+ LUCENE-2050: Added ability to run tasks within a serial sequence in
+ the background, by appending "&". The tasks are stopped & joined at
+ the end of the sequence. Also added Wait and RollbackIndex tasks.
+ Genericized NearRealTimeReaderTask to only reopen the reader
+ (previously it spawned its own thread, and also did searching).
+ Also changed the API of PerfRunData.getIndexReader: it now returns a
+ reference, and it's your job to decRef the reader when you're done
+ using it. (Mike McCandless)
+
11/12/2009
LUCENE-2059: allow TrecContentSource not to change the docname.
Previously, it would always append the iteration # to the docname.
Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java
===================================================================
--- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java (revision 835818)
+++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java (working copy)
@@ -186,6 +186,19 @@
currSequence = seq2;
colonOk = false;
break;
+
+ case '&' :
+ if (currSequence.isParallel()) {
+ throw new Exception("Can only create background tasks within a serial task");
+ }
+ if (prevTask == null) {
+ throw new Exception("& was unexpected");
+ } else if (prevTask.getRunInBackground()) {
+ throw new Exception("double & was unexpected");
+ } else {
+ prevTask.setRunInBackground();
+ }
+ break;
case '>' :
currSequence.setNoChildReport();
Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PrintReaderTask.java
===================================================================
--- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PrintReaderTask.java (revision 835818)
+++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PrintReaderTask.java (working copy)
@@ -43,12 +43,13 @@
@Override
public int doLogic() throws Exception {
Directory dir = getRunData().getDirectory();
- Config config = getRunData().getConfig();
IndexReader r = null;
if (userData == null)
r = IndexReader.open(dir, true);
else
- r = OpenReaderTask.openCommitPoint(userData, dir, config, true);
+ r = IndexReader.open(OpenReaderTask.findIndexCommit(dir, userData),
+ null,
+ true);
System.out.println("--> numDocs:"+r.numDocs()+" dels:"+r.numDeletedDocs());
r.close();
return 1;
Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CommitIndexTask.java
===================================================================
--- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CommitIndexTask.java (revision 835818)
+++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CommitIndexTask.java (working copy)
@@ -54,6 +54,7 @@
IndexReader r = getRunData().getIndexReader();
if (r != null) {
r.commit(commitUserData);
+ r.decRef();
} else {
throw new IllegalStateException("neither IndexWriter nor IndexReader is currently open");
}
Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/RollbackIndexTask.java
===================================================================
--- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/RollbackIndexTask.java (revision 0)
+++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/RollbackIndexTask.java (revision 0)
@@ -0,0 +1,52 @@
+package org.apache.lucene.benchmark.byTask.tasks;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.apache.lucene.benchmark.byTask.PerfRunData;
+import org.apache.lucene.index.IndexWriter;
+
+/**
+ * Rollback the index writer.
+ */
+public class RollbackIndexTask extends PerfTask {
+
+ public RollbackIndexTask(PerfRunData runData) {
+ super(runData);
+ }
+
+ boolean doWait = true;
+
+ @Override
+ public int doLogic() throws IOException {
+ IndexWriter iw = getRunData().getIndexWriter();
+ if (iw != null) {
+ // If infoStream was set to output to a file, close it.
+ PrintStream infoStream = iw.getInfoStream();
+ if (infoStream != null && infoStream != System.out
+ && infoStream != System.err) {
+ infoStream.close();
+ }
+ iw.rollback();
+ getRunData().setIndexWriter(null);
+ }
+ return 1;
+ }
+}
Property changes on: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/RollbackIndexTask.java
___________________________________________________________________
Added: svn:eol-style
+ native
Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/OpenIndexTask.java
===================================================================
--- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/OpenIndexTask.java (revision 835818)
+++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/OpenIndexTask.java (working copy)
@@ -20,6 +20,7 @@
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.benchmark.byTask.utils.Config;
import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.LogMergePolicy;
import java.io.IOException;
@@ -30,6 +31,11 @@
*
Other side effects: index writer object in perfRunData is set.
*
Relevant properties: merge.factor, max.buffered,
* max.field.length, ram.flush.mb [default 0].
+ *
+ *
Accepts a param specifying the commit point as + * previously saved with CommitIndexTask. If you specify + * this, it rolls the index back to that commit on opening + * the IndexWriter. */ public class OpenIndexTask extends PerfTask { @@ -37,6 +43,7 @@ public static final int DEFAULT_MAX_FIELD_LENGTH = IndexWriter.DEFAULT_MAX_FIELD_LENGTH; public static final int DEFAULT_MERGE_PFACTOR = LogMergePolicy.DEFAULT_MERGE_FACTOR; public static final double DEFAULT_RAM_FLUSH_MB = (int) IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB; + private String commitUserData; public OpenIndexTask(PerfRunData runData) { super(runData); @@ -46,12 +53,34 @@ public int doLogic() throws IOException { PerfRunData runData = getRunData(); Config config = runData.getConfig(); + final IndexCommit ic; + if (commitUserData != null) { + ic = OpenReaderTask.findIndexCommit(runData.getDirectory(), commitUserData); + } else { + ic = null; + } + IndexWriter writer = new IndexWriter(runData.getDirectory(), runData.getAnalyzer(), - false, - IndexWriter.MaxFieldLength.UNLIMITED); + CreateIndexTask.getIndexDeletionPolicy(config), + IndexWriter.MaxFieldLength.UNLIMITED, + ic); CreateIndexTask.setIndexWriterConfig(writer, config); runData.setIndexWriter(writer); return 1; } + + @Override + public void setParams(String params) { + super.setParams(params); + if (params != null) { + // specifies which commit point to open + commitUserData = params; + } + } + + @Override + public boolean supportsParams() { + return true; + } } Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java =================================================================== --- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java (revision 835818) +++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java (working copy) @@ -127,11 +127,10 @@ PerfRunData runData = getRunData(); Config config = runData.getConfig(); - IndexDeletionPolicy indexDeletionPolicy = getIndexDeletionPolicy(config); - IndexWriter writer = new IndexWriter(runData.getDirectory(), runData.getAnalyzer(), - true, indexDeletionPolicy, + true, + getIndexDeletionPolicy(config), IndexWriter.MaxFieldLength.LIMITED); setIndexWriterConfig(writer, config); runData.setIndexWriter(writer); Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/NearRealtimeReaderTask.java =================================================================== --- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/NearRealtimeReaderTask.java (revision 835818) +++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/NearRealtimeReaderTask.java (working copy) @@ -17,18 +17,9 @@ * limitations under the License. */ -import java.io.IOException; - import org.apache.lucene.benchmark.byTask.PerfRunData; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.TermQuery; -import org.apache.lucene.search.Sort; -import org.apache.lucene.search.SortField; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.TopFieldDocs; -import org.apache.lucene.index.Term; /** * Spawns a BG thread that periodically (defaults to 3.0 @@ -43,97 +34,67 @@ */ public class NearRealtimeReaderTask extends PerfTask { - ReopenThread t; - float pauseSec = 3.0f; + long pauseMSec = 3000L; - private static class ReopenThread extends Thread { + public NearRealtimeReaderTask(PerfRunData runData) { + super(runData); + } - final IndexWriter writer; - final int pauseMsec; + @Override + public int doLogic() throws Exception { - public volatile boolean done; + final PerfRunData runData = getRunData(); - ReopenThread(IndexWriter writer, float pauseSec) { - this.writer = writer; - this.pauseMsec = (int) (1000*pauseSec); - setDaemon(true); + // Get initial reader + IndexWriter w = runData.getIndexWriter(); + if (w == null) { + throw new RuntimeException("please open the writer before invoking NearRealtimeReader"); } - @Override - public void run() { + if (runData.getIndexReader() != null) { + throw new RuntimeException("please close the existing reader before invoking NearRealtimeReader"); + } + + long t = System.currentTimeMillis(); + IndexReader r = w.getReader(); + runData.setIndexReader(r); + // Transfer our reference to runData + r.decRef(); - IndexReader reader = null; + // TODO: gather basic metrics for reporting -- eg mean, + // stddev, min/max reopen latencies - final Query query = new TermQuery(new Term("body", "1")); - final SortField sf = new SortField("docdate", SortField.LONG); - final Sort sort = new Sort(sf); + // Parent sequence sets stopNow + int reopenCount = 0; + while(!stopNow) { + long waitForMsec = (long) (pauseMSec - (System.currentTimeMillis() - t)); + if (waitForMsec > 0) { + Thread.sleep(waitForMsec); + } - try { - while(!done) { - final long t0 = System.currentTimeMillis(); - if (reader == null) { - reader = writer.getReader(); - } else { - final IndexReader newReader = reader.reopen(); - if (reader != newReader) { - reader.close(); - reader = newReader; - } - } - - final long t1 = System.currentTimeMillis(); - final TopFieldDocs hits = new IndexSearcher(reader).search(query, null, 10, sort); - final long t2 = System.currentTimeMillis(); - System.out.println("nrt: open " + (t1-t0) + " msec; search " + (t2-t1) + " msec, " + hits.totalHits + - " results; " + reader.numDocs() + " docs"); - - final long t4 = System.currentTimeMillis(); - final int delay = (int) (pauseMsec - (t4-t0)); - if (delay > 0) { - try { - Thread.sleep(delay); - } catch (InterruptedException ie) { - throw new RuntimeException(ie); - } - } - } - } catch (Exception e) { - throw new RuntimeException(e); + t = System.currentTimeMillis(); + final IndexReader newReader = r.reopen(); + if (r != newReader) { + // TODO: somehow we need to enable warming, here + runData.setIndexReader(newReader); + // Transfer our reference to runData + newReader.decRef(); + r = newReader; + reopenCount++; } } - } - public NearRealtimeReaderTask(PerfRunData runData) { - super(runData); + return reopenCount; } @Override - public int doLogic() throws IOException { - if (t == null) { - IndexWriter w = getRunData().getIndexWriter(); - t = new ReopenThread(w, pauseSec); - t.start(); - } - return 1; - } - - @Override public void setParams(String params) { super.setParams(params); - pauseSec = Float.parseFloat(params); + pauseMSec = (long) (1000.0*Float.parseFloat(params)); } @Override public boolean supportsParams() { return true; } - - // Close the thread - @Override - public void close() throws InterruptedException { - if (t != null) { - t.done = true; - t.join(); - } - } } Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PerfTask.java =================================================================== --- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PerfTask.java (revision 835818) +++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PerfTask.java (working copy) @@ -59,7 +59,9 @@ private int maxDepthLogStart = 0; private boolean disableCounting = false; protected String params = null; - + + private boolean runInBackground; + protected static final String NEW_LINE = System.getProperty("line.separator"); /** Should not be used externally */ @@ -69,7 +71,21 @@ name = name.substring(0, name.length() - 4); } } - + + public void setRunInBackground() { + runInBackground = true; + } + + public boolean getRunInBackground() { + return runInBackground; + } + + protected volatile boolean stopNow; + + public void stopNow() { + stopNow = true; + } + public PerfTask(PerfRunData runData) { this(); this.runData = runData; @@ -112,9 +128,6 @@ * @return number of work items done by this task. */ public final int runAndMaybeStats(boolean reportStats) throws Exception { - if (reportStats && depth <= maxDepthLogStart && !shouldNeverLogAtStart()) { - System.out.println("------------> starting task: " + getName()); - } if (!reportStats || shouldNotRecordStats()) { setup(); int count = doLogic(); @@ -122,9 +135,12 @@ tearDown(); return count; } + if (reportStats && depth <= maxDepthLogStart && !shouldNeverLogAtStart()) { + System.out.println("------------> starting task: " + getName()); + } setup(); Points pnts = runData.getPoints(); - TaskStats ts = pnts.markTaskStart(this,runData.getConfig().getRoundNumber()); + TaskStats ts = pnts.markTaskStart(this, runData.getConfig().getRoundNumber()); int count = doLogic(); count = disableCounting ? 0 : count; pnts.markTaskEnd(ts, count); @@ -197,6 +213,9 @@ sb.append('-'); } sb.append(getName()); + if (getRunInBackground()) { + sb.append(" &"); + } return sb.toString(); } Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/WaitTask.java =================================================================== --- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/WaitTask.java (revision 0) +++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/WaitTask.java (revision 0) @@ -0,0 +1,75 @@ +package org.apache.lucene.benchmark.byTask.tasks; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.benchmark.byTask.PerfRunData; + +/** + * Simply waits for the specified (via the parameter) amount + * of time. For example Wait(30s) waits for 30 seconds. + * This is useful with background tasks to control how long + * the tasks run. + * + *
You can specify h, m, or s (hours, minutes, seconds) as + *the trailing time unit. No unit is interpreted as + *seconds.
+ */ +public class WaitTask extends PerfTask { + + private double waitTimeSec; + + public WaitTask(PerfRunData runData) { + super(runData); + } + + @Override + public void setParams(String params) { + super.setParams(params); + if (params != null) { + int multiplier; + if (params.endsWith("s")) { + multiplier = 1; + params = params.substring(0, params.length()-1); + } else if (params.endsWith("m")) { + multiplier = 60; + params = params.substring(0, params.length()-1); + } else if (params.endsWith("h")) { + multiplier = 3600; + params = params.substring(0, params.length()-1); + } else { + // Assume seconds + multiplier = 1; + } + + waitTimeSec = Double.parseDouble(params) * multiplier; + } else { + throw new IllegalArgumentException("you must specify the wait time, eg: 10.0s, 4.5m, 2h"); + } + } + + @Override + public int doLogic() throws Exception { + Thread.sleep((long) (1000*waitTimeSec)); + return 0; + } + + @Override + public boolean supportsParams() { + return true; + } +} Property changes on: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/WaitTask.java ___________________________________________________________________ Added: svn:eol-style + native Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/UpdateDocTask.java =================================================================== --- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/UpdateDocTask.java (revision 835818) +++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/UpdateDocTask.java (working copy) @@ -21,6 +21,7 @@ import org.apache.lucene.benchmark.byTask.feeds.DocMaker; import org.apache.lucene.document.Document; import org.apache.lucene.index.Term; +import org.apache.lucene.index.IndexWriter; /** * Update a document, using IndexWriter.updateDocument, @@ -62,7 +63,8 @@ if (docID == null) { throw new IllegalStateException("document must define the docid field"); } - getRunData().getIndexWriter().updateDocument(new Term(DocMaker.ID_FIELD, docID), doc); + final IndexWriter iw = getRunData().getIndexWriter(); + iw.updateDocument(new Term(DocMaker.ID_FIELD, docID), doc); return 1; } Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/ReopenReaderTask.java =================================================================== --- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/ReopenReaderTask.java (revision 835818) +++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/ReopenReaderTask.java (working copy) @@ -33,13 +33,13 @@ @Override public int doLogic() throws IOException { - IndexReader ir = getRunData().getIndexReader(); - IndexReader or = ir; - IndexReader nr = ir.reopen(); - if(nr != or) { + IndexReader r = getRunData().getIndexReader(); + IndexReader nr = r.reopen(); + if (nr != r) { getRunData().setIndexReader(nr); - or.close(); + nr.decRef(); } + r.decRef(); return 1; } } Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/DeleteByPercentTask.java =================================================================== --- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/DeleteByPercentTask.java (revision 835818) +++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/DeleteByPercentTask.java (working copy) @@ -88,6 +88,7 @@ termDocs.close(); } System.out.println("--> processed (delete) " + numDeleted + " docs"); + r.decRef(); return numDeleted; } } Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/OpenReaderTask.java =================================================================== --- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/OpenReaderTask.java (revision 835818) +++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/OpenReaderTask.java (working copy) @@ -47,35 +47,27 @@ Directory dir = getRunData().getDirectory(); Config config = getRunData().getConfig(); IndexReader r = null; + final IndexDeletionPolicy deletionPolicy; + if (readOnly) { + deletionPolicy = null; + } else { + deletionPolicy = CreateIndexTask.getIndexDeletionPolicy(config); + } if (commitUserData != null) { - r = openCommitPoint(commitUserData, dir, config, readOnly); + r = IndexReader.open(OpenReaderTask.findIndexCommit(dir, commitUserData), + deletionPolicy, + readOnly); } else { - IndexDeletionPolicy indexDeletionPolicy = CreateIndexTask.getIndexDeletionPolicy(config); - r = IndexReader.open(dir, indexDeletionPolicy, readOnly); + r = IndexReader.open(dir, + deletionPolicy, + readOnly); } getRunData().setIndexReader(r); + // We transfer reference to the run data + r.decRef(); return 1; } - public static IndexReader openCommitPoint(String userData, Directory dir, Config config, boolean readOnly) throws IOException { - IndexReader r = null; - Collection