Index: contrib/benchmark/CHANGES.txt
===================================================================
--- contrib/benchmark/CHANGES.txt (revision 835818)
+++ contrib/benchmark/CHANGES.txt (working copy)
@@ -4,6 +4,16 @@
$Id:$
+11/13/2009
+ LUCENE-2050: Added ability to run tasks within a serial sequence in
+ the background, by appending "&". The tasks are stopped & joined at
+ the end of the sequence. Also added Wait and RollbackIndex tasks.
+ Genericized NearRealTimeReaderTask to only reopen the reader
+ (previously it spawned its own thread, and also did searching).
+ Also changed the API of PerfRunData.getIndexReader: it now returns a
+ reference, and it's your job to decRef the reader when you're done
+ using it. (Mike McCandless)
+
11/12/2009
LUCENE-2059: allow TrecContentSource not to change the docname.
Previously, it would always append the iteration # to the docname.
Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java
===================================================================
--- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java (revision 835818)
+++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java (working copy)
@@ -186,6 +186,19 @@
currSequence = seq2;
colonOk = false;
break;
+
+ case '&' :
+ if (currSequence.isParallel()) {
+ throw new Exception("Can only create background tasks within a serial task");
+ }
+ if (prevTask == null) {
+ throw new Exception("& was unexpected");
+ } else if (prevTask.getRunInBackground()) {
+ throw new Exception("double & was unexpected");
+ } else {
+ prevTask.setRunInBackground();
+ }
+ break;
case '>' :
currSequence.setNoChildReport();
Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PrintReaderTask.java
===================================================================
--- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PrintReaderTask.java (revision 835818)
+++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PrintReaderTask.java (working copy)
@@ -43,12 +43,13 @@
@Override
public int doLogic() throws Exception {
Directory dir = getRunData().getDirectory();
- Config config = getRunData().getConfig();
IndexReader r = null;
if (userData == null)
r = IndexReader.open(dir, true);
else
- r = OpenReaderTask.openCommitPoint(userData, dir, config, true);
+ r = IndexReader.open(OpenReaderTask.findIndexCommit(dir, userData),
+ null,
+ true);
System.out.println("--> numDocs:"+r.numDocs()+" dels:"+r.numDeletedDocs());
r.close();
return 1;
Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CommitIndexTask.java
===================================================================
--- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CommitIndexTask.java (revision 835818)
+++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CommitIndexTask.java (working copy)
@@ -54,6 +54,7 @@
IndexReader r = getRunData().getIndexReader();
if (r != null) {
r.commit(commitUserData);
+ r.decRef();
} else {
throw new IllegalStateException("neither IndexWriter nor IndexReader is currently open");
}
Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/OpenIndexTask.java
===================================================================
--- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/OpenIndexTask.java (revision 835818)
+++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/OpenIndexTask.java (working copy)
@@ -20,6 +20,7 @@
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.benchmark.byTask.utils.Config;
import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.LogMergePolicy;
import java.io.IOException;
@@ -30,6 +31,11 @@
*
Other side effects: index writer object in perfRunData is set.
*
Relevant properties: merge.factor, max.buffered,
* max.field.length, ram.flush.mb [default 0].
+ *
+ *
Accepts a param specifying the commit point as
+ * previously saved with CommitIndexTask. If you specify
+ * this, it rolls the index back to that commit on opening
+ * the IndexWriter.
*/
public class OpenIndexTask extends PerfTask {
@@ -37,6 +43,7 @@
public static final int DEFAULT_MAX_FIELD_LENGTH = IndexWriter.DEFAULT_MAX_FIELD_LENGTH;
public static final int DEFAULT_MERGE_PFACTOR = LogMergePolicy.DEFAULT_MERGE_FACTOR;
public static final double DEFAULT_RAM_FLUSH_MB = (int) IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB;
+ private String commitUserData;
public OpenIndexTask(PerfRunData runData) {
super(runData);
@@ -46,12 +53,34 @@
public int doLogic() throws IOException {
PerfRunData runData = getRunData();
Config config = runData.getConfig();
+ final IndexCommit ic;
+ if (commitUserData != null) {
+ ic = OpenReaderTask.findIndexCommit(runData.getDirectory(), commitUserData);
+ } else {
+ ic = null;
+ }
+
IndexWriter writer = new IndexWriter(runData.getDirectory(),
runData.getAnalyzer(),
- false,
- IndexWriter.MaxFieldLength.UNLIMITED);
+ CreateIndexTask.getIndexDeletionPolicy(config),
+ IndexWriter.MaxFieldLength.UNLIMITED,
+ ic);
CreateIndexTask.setIndexWriterConfig(writer, config);
runData.setIndexWriter(writer);
return 1;
}
+
+ @Override
+ public void setParams(String params) {
+ super.setParams(params);
+ if (params != null) {
+ // specifies which commit point to open
+ commitUserData = params;
+ }
+ }
+
+ @Override
+ public boolean supportsParams() {
+ return true;
+ }
}
Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java
===================================================================
--- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java (revision 835818)
+++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java (working copy)
@@ -127,11 +127,10 @@
PerfRunData runData = getRunData();
Config config = runData.getConfig();
- IndexDeletionPolicy indexDeletionPolicy = getIndexDeletionPolicy(config);
-
IndexWriter writer = new IndexWriter(runData.getDirectory(),
runData.getAnalyzer(),
- true, indexDeletionPolicy,
+ true,
+ getIndexDeletionPolicy(config),
IndexWriter.MaxFieldLength.LIMITED);
setIndexWriterConfig(writer, config);
runData.setIndexWriter(writer);
Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/NearRealtimeReaderTask.java
===================================================================
--- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/NearRealtimeReaderTask.java (revision 835818)
+++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/NearRealtimeReaderTask.java (working copy)
@@ -17,18 +17,9 @@
* limitations under the License.
*/
-import java.io.IOException;
-
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.search.Sort;
-import org.apache.lucene.search.SortField;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.TopFieldDocs;
-import org.apache.lucene.index.Term;
/**
* Spawns a BG thread that periodically (defaults to 3.0
@@ -43,97 +34,67 @@
*/
public class NearRealtimeReaderTask extends PerfTask {
- ReopenThread t;
- float pauseSec = 3.0f;
+ long pauseMSec = 3000L;
- private static class ReopenThread extends Thread {
+ public NearRealtimeReaderTask(PerfRunData runData) {
+ super(runData);
+ }
- final IndexWriter writer;
- final int pauseMsec;
+ @Override
+ public int doLogic() throws Exception {
- public volatile boolean done;
+ final PerfRunData runData = getRunData();
- ReopenThread(IndexWriter writer, float pauseSec) {
- this.writer = writer;
- this.pauseMsec = (int) (1000*pauseSec);
- setDaemon(true);
+ // Get initial reader
+ IndexWriter w = runData.getIndexWriter();
+ if (w == null) {
+ throw new RuntimeException("please open the writer before invoking NearRealtimeReader");
}
- @Override
- public void run() {
+ if (runData.getIndexReader() != null) {
+ throw new RuntimeException("please close the existing reader before invoking NearRealtimeReader");
+ }
+
+ long t = System.currentTimeMillis();
+ IndexReader r = w.getReader();
+ runData.setIndexReader(r);
+ // Transfer our reference to runData
+ r.decRef();
- IndexReader reader = null;
+ // TODO: gather basic metrics for reporting -- eg mean,
+ // stddev, min/max reopen latencies
- final Query query = new TermQuery(new Term("body", "1"));
- final SortField sf = new SortField("docdate", SortField.LONG);
- final Sort sort = new Sort(sf);
+ // Parent sequence sets stopNow
+ int reopenCount = 0;
+ while(!stopNow) {
+ long waitForMsec = (long) (pauseMSec - (System.currentTimeMillis() - t));
+ if (waitForMsec > 0) {
+ Thread.sleep(waitForMsec);
+ }
- try {
- while(!done) {
- final long t0 = System.currentTimeMillis();
- if (reader == null) {
- reader = writer.getReader();
- } else {
- final IndexReader newReader = reader.reopen();
- if (reader != newReader) {
- reader.close();
- reader = newReader;
- }
- }
-
- final long t1 = System.currentTimeMillis();
- final TopFieldDocs hits = new IndexSearcher(reader).search(query, null, 10, sort);
- final long t2 = System.currentTimeMillis();
- System.out.println("nrt: open " + (t1-t0) + " msec; search " + (t2-t1) + " msec, " + hits.totalHits +
- " results; " + reader.numDocs() + " docs");
-
- final long t4 = System.currentTimeMillis();
- final int delay = (int) (pauseMsec - (t4-t0));
- if (delay > 0) {
- try {
- Thread.sleep(delay);
- } catch (InterruptedException ie) {
- throw new RuntimeException(ie);
- }
- }
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
+ t = System.currentTimeMillis();
+ final IndexReader newReader = r.reopen();
+ if (r != newReader) {
+ // TODO: somehow we need to enable warming, here
+ runData.setIndexReader(newReader);
+ // Transfer our reference to runData
+ newReader.decRef();
+ r = newReader;
+ reopenCount++;
}
}
- }
- public NearRealtimeReaderTask(PerfRunData runData) {
- super(runData);
+ return reopenCount;
}
@Override
- public int doLogic() throws IOException {
- if (t == null) {
- IndexWriter w = getRunData().getIndexWriter();
- t = new ReopenThread(w, pauseSec);
- t.start();
- }
- return 1;
- }
-
- @Override
public void setParams(String params) {
super.setParams(params);
- pauseSec = Float.parseFloat(params);
+ pauseMSec = (long) (1000.0*Float.parseFloat(params));
}
@Override
public boolean supportsParams() {
return true;
}
-
- // Close the thread
- @Override
- public void close() throws InterruptedException {
- if (t != null) {
- t.done = true;
- t.join();
- }
- }
}
Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PerfTask.java
===================================================================
--- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PerfTask.java (revision 835818)
+++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PerfTask.java (working copy)
@@ -59,7 +59,9 @@
private int maxDepthLogStart = 0;
private boolean disableCounting = false;
protected String params = null;
-
+
+ private boolean runInBackground;
+
protected static final String NEW_LINE = System.getProperty("line.separator");
/** Should not be used externally */
@@ -69,7 +71,21 @@
name = name.substring(0, name.length() - 4);
}
}
-
+
+ public void setRunInBackground() {
+ runInBackground = true;
+ }
+
+ public boolean getRunInBackground() {
+ return runInBackground;
+ }
+
+ protected volatile boolean stopNow;
+
+ public void stopNow() {
+ stopNow = true;
+ }
+
public PerfTask(PerfRunData runData) {
this();
this.runData = runData;
@@ -112,9 +128,6 @@
* @return number of work items done by this task.
*/
public final int runAndMaybeStats(boolean reportStats) throws Exception {
- if (reportStats && depth <= maxDepthLogStart && !shouldNeverLogAtStart()) {
- System.out.println("------------> starting task: " + getName());
- }
if (!reportStats || shouldNotRecordStats()) {
setup();
int count = doLogic();
@@ -122,9 +135,12 @@
tearDown();
return count;
}
+ if (reportStats && depth <= maxDepthLogStart && !shouldNeverLogAtStart()) {
+ System.out.println("------------> starting task: " + getName());
+ }
setup();
Points pnts = runData.getPoints();
- TaskStats ts = pnts.markTaskStart(this,runData.getConfig().getRoundNumber());
+ TaskStats ts = pnts.markTaskStart(this, runData.getConfig().getRoundNumber());
int count = doLogic();
count = disableCounting ? 0 : count;
pnts.markTaskEnd(ts, count);
@@ -197,6 +213,9 @@
sb.append('-');
}
sb.append(getName());
+ if (getRunInBackground()) {
+ sb.append(" &");
+ }
return sb.toString();
}
Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/UpdateDocTask.java
===================================================================
--- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/UpdateDocTask.java (revision 835818)
+++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/UpdateDocTask.java (working copy)
@@ -21,6 +21,7 @@
import org.apache.lucene.benchmark.byTask.feeds.DocMaker;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.Term;
+import org.apache.lucene.index.IndexWriter;
/**
* Update a document, using IndexWriter.updateDocument,
@@ -62,7 +63,8 @@
if (docID == null) {
throw new IllegalStateException("document must define the docid field");
}
- getRunData().getIndexWriter().updateDocument(new Term(DocMaker.ID_FIELD, docID), doc);
+ final IndexWriter iw = getRunData().getIndexWriter();
+ iw.updateDocument(new Term(DocMaker.ID_FIELD, docID), doc);
return 1;
}
Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/ReopenReaderTask.java
===================================================================
--- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/ReopenReaderTask.java (revision 835818)
+++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/ReopenReaderTask.java (working copy)
@@ -33,13 +33,13 @@
@Override
public int doLogic() throws IOException {
- IndexReader ir = getRunData().getIndexReader();
- IndexReader or = ir;
- IndexReader nr = ir.reopen();
- if(nr != or) {
+ IndexReader r = getRunData().getIndexReader();
+ IndexReader nr = r.reopen();
+ if (nr != r) {
getRunData().setIndexReader(nr);
- or.close();
+ nr.decRef();
}
+ r.decRef();
return 1;
}
}
Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/DeleteByPercentTask.java
===================================================================
--- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/DeleteByPercentTask.java (revision 835818)
+++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/DeleteByPercentTask.java (working copy)
@@ -88,6 +88,7 @@
termDocs.close();
}
System.out.println("--> processed (delete) " + numDeleted + " docs");
+ r.decRef();
return numDeleted;
}
}
Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/OpenReaderTask.java
===================================================================
--- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/OpenReaderTask.java (revision 835818)
+++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/OpenReaderTask.java (working copy)
@@ -47,35 +47,27 @@
Directory dir = getRunData().getDirectory();
Config config = getRunData().getConfig();
IndexReader r = null;
+ final IndexDeletionPolicy deletionPolicy;
+ if (readOnly) {
+ deletionPolicy = null;
+ } else {
+ deletionPolicy = CreateIndexTask.getIndexDeletionPolicy(config);
+ }
if (commitUserData != null) {
- r = openCommitPoint(commitUserData, dir, config, readOnly);
+ r = IndexReader.open(OpenReaderTask.findIndexCommit(dir, commitUserData),
+ deletionPolicy,
+ readOnly);
} else {
- IndexDeletionPolicy indexDeletionPolicy = CreateIndexTask.getIndexDeletionPolicy(config);
- r = IndexReader.open(dir, indexDeletionPolicy, readOnly);
+ r = IndexReader.open(dir,
+ deletionPolicy,
+ readOnly);
}
getRunData().setIndexReader(r);
+ // We transfer reference to the run data
+ r.decRef();
return 1;
}
- public static IndexReader openCommitPoint(String userData, Directory dir, Config config, boolean readOnly) throws IOException {
- IndexReader r = null;
- Collection