Index: src/java/org/apache/lucene/search/ParallelMultiSearcher.java
===================================================================
--- src/java/org/apache/lucene/search/ParallelMultiSearcher.java (revision 833527)
+++ src/java/org/apache/lucene/search/ParallelMultiSearcher.java (working copy)
@@ -19,14 +19,16 @@
import java.io.IOException;
+import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
-import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.PriorityQueue;
-/** Implements parallel search over a set of Searchables.
- *
- *
Applications usually need only call the inherited {@link #search(Query)}
- * or {@link #search(Query,Filter)} methods.
+/**
+ * Implements parallel search over a set of Searchables.
+ *
+ *
+ * Applications usually need only call the inherited {@link #search(Query)} or + * {@link #search(Query,Filter)} methods. */ public class ParallelMultiSearcher extends MultiSearcher { @@ -40,28 +42,56 @@ this.starts = getStarts(); } - /** - * TODO: parallelize this one too - */ + @Override public int docFreq(Term term) throws IOException { - return super.docFreq(term); + int docFreq = 0; + + MultiDocFreqThread[] mdft_ary = new MultiDocFreqThread[searchables.length]; + for (int i = 0; i < searchables.length; i++) { + // create thread + mdft_ary[i] = new MultiDocFreqThread(searchables[i], term, + "MultiDocFreq thread #" + i); + + // start threads + mdft_ary[i].start(); + } + + // aggregate results + for (int i = 0; i < searchables.length; i++) { + try { + mdft_ary[i].join(); + } catch (InterruptedException ie) { + // In 3.0 we will change this to throw + // InterruptedException instead + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); + } + IOException ioe = mdft_ary[i].getIOException(); + if (ioe == null) { + docFreq += mdft_ary[i].docFreq(); + } else { + // if one search produced an IOException, rethrow it + throw ioe; + } + } + + return docFreq; } /** - * A search implementation which spans a new thread for each - * Searchable, waits for each search to complete and merge - * the results back together. + * A search implementation which spans a new thread for each Searchable, waits + * for each search to complete and merge the results back together. */ @Override public TopDocs search(Weight weight, Filter filter, int nDocs) - throws IOException { + throws IOException { HitQueue hq = new HitQueue(nDocs, false); int totalHits = 0; - MultiSearcherThread[] msta = - new MultiSearcherThread[searchables.length]; + MultiSearcherThread[] msta = new MultiSearcherThread[searchables.length]; for (int i = 0; i < searchables.length; i++) { // search each searchable - // Assume not too many searchables and cost of creating a thread is by far inferior to a search + // Assume not too many searchables and cost of creating a thread is by far + // inferior to a search msta[i] = new MultiSearcherThread(searchables[i], weight, filter, nDocs, hq, i, starts, "MultiSearcher thread #" + (i + 1)); msta[i].start(); @@ -86,35 +116,38 @@ } ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()]; - for (int i = hq.size() - 1; i >= 0; i--) // put docs in array + for (int i = hq.size() - 1; i >= 0; i--) + // put docs in array scoreDocs[i] = hq.pop(); - float maxScore = (totalHits==0) ? Float.NEGATIVE_INFINITY : scoreDocs[0].score; - + float maxScore = (totalHits == 0) ? Float.NEGATIVE_INFINITY + : scoreDocs[0].score; + return new TopDocs(totalHits, scoreDocs, maxScore); } /** * A search implementation allowing sorting which spans a new thread for each - * Searchable, waits for each search to complete and merges - * the results back together. + * Searchable, waits for each search to complete and merges the results back + * together. */ @Override public TopFieldDocs search(Weight weight, Filter filter, int nDocs, Sort sort) - throws IOException { + throws IOException { // don't specify the fields - we'll wait to do this until we get results - FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue (null, nDocs); + FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue(null, nDocs); int totalHits = 0; MultiSearcherThread[] msta = new MultiSearcherThread[searchables.length]; for (int i = 0; i < searchables.length; i++) { // search each searchable - // Assume not too many searchables and cost of creating a thread is by far inferior to a search + // Assume not too many searchables and cost of creating a thread is by far + // inferior to a search msta[i] = new MultiSearcherThread(searchables[i], weight, filter, nDocs, hq, sort, i, starts, "MultiSearcher thread #" + (i + 1)); msta[i].start(); } - float maxScore=Float.NEGATIVE_INFINITY; - + float maxScore = Float.NEGATIVE_INFINITY; + for (int i = 0; i < searchables.length; i++) { try { msta[i].join(); @@ -127,7 +160,7 @@ IOException ioe = msta[i].getIOException(); if (ioe == null) { totalHits += msta[i].hits(); - maxScore=Math.max(maxScore, msta[i].getMaxScore()); + maxScore = Math.max(maxScore, msta[i].getMaxScore()); } else { // if one search produced an IOException, rethrow it throw ioe; @@ -135,63 +168,72 @@ } ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()]; - for (int i = hq.size() - 1; i >= 0; i--) // put docs in array + for (int i = hq.size() - 1; i >= 0; i--) + // put docs in array scoreDocs[i] = hq.pop(); return new TopFieldDocs(totalHits, scoreDocs, hq.getFields(), maxScore); } - /** Lower-level search API. - * - *
{@link Collector#collect(int)} is called for every matching document. - * - *
Applications should only use this if they need all of the - * matching documents. The high-level search API ({@link - * Searcher#search(Query)}) is usually more efficient, as it skips - * non-high-scoring hits. - * - * @param weight to match documents - * @param filter if non-null, a bitset used to eliminate some documents - * @param collector to receive hits - * - * TODO: parallelize this one too - */ + /** + * Lower-level search API. + * + *
+ * {@link Collector#collect(int)} is called for every matching document. + * + *
+ * Applications should only use this if they need all of the matching
+ * documents. The high-level search API ({@link Searcher#search(Query)}) is
+ * usually more efficient, as it skips non-high-scoring hits.
+ *
+ * @param weight
+ * to match documents
+ * @param filter
+ * if non-null, a bitset used to eliminate some documents
+ * @param collector
+ * to receive hits
+ *
+ * TODO: parallelize this one too
+ */
@Override
public void search(Weight weight, Filter filter, final Collector collector)
- throws IOException {
- for (int i = 0; i < searchables.length; i++) {
+ throws IOException {
+ for (int i = 0; i < searchables.length; i++) {
- final int start = starts[i];
+ final int start = starts[i];
- final Collector hc = new Collector() {
- @Override
- public void setScorer(Scorer scorer) throws IOException {
- collector.setScorer(scorer);
- }
-
- @Override
- public void collect(int doc) throws IOException {
- collector.collect(doc);
- }
-
- @Override
- public void setNextReader(IndexReader reader, int docBase) throws IOException {
- collector.setNextReader(reader, start + docBase);
- }
-
- @Override
- public boolean acceptsDocsOutOfOrder() {
- return collector.acceptsDocsOutOfOrder();
- }
- };
-
- searchables[i].search(weight, filter, hc);
- }
+ final Collector hc = new Collector() {
+ @Override
+ public void setScorer(Scorer scorer) throws IOException {
+ collector.setScorer(scorer);
+ }
+
+ @Override
+ public void collect(int doc) throws IOException {
+ collector.collect(doc);
+ }
+
+ @Override
+ public void setNextReader(IndexReader reader, int docBase)
+ throws IOException {
+ collector.setNextReader(reader, start + docBase);
+ }
+
+ @Override
+ public boolean acceptsDocsOutOfOrder() {
+ return collector.acceptsDocsOutOfOrder();
+ }
+ };
+
+ searchables[i].search(weight, filter, hc);
+ }
}
/*
* TODO: this one could be parallelized too
- * @see org.apache.lucene.search.Searchable#rewrite(org.apache.lucene.search.Query)
+ *
+ * @see
+ * org.apache.lucene.search.Searchable#rewrite(org.apache.lucene.search.Query)
*/
@Override
public Query rewrite(Query original) throws IOException {
@@ -201,7 +243,7 @@
}
/**
- * A thread subclass for searching a single searchable
+ * A thread subclass for searching a single searchable
*/
class MultiSearcherThread extends Thread {
@@ -216,8 +258,8 @@
private IOException ioe;
private Sort sort;
- public MultiSearcherThread(Searchable searchable, Weight weight, Filter filter,
- int nDocs, HitQueue hq, int i, int[] starts, String name) {
+ public MultiSearcherThread(Searchable searchable, Weight weight,
+ Filter filter, int nDocs, HitQueue hq, int i, int[] starts, String name) {
super(name);
this.searchable = searchable;
this.weight = weight;
@@ -243,11 +285,11 @@
}
@Override
- @SuppressWarnings ("unchecked")
+ @SuppressWarnings("unchecked")
public void run() {
try {
- docs = (sort == null) ? searchable.search (weight, filter, nDocs)
- : searchable.search (weight, filter, nDocs, sort);
+ docs = (sort == null) ? searchable.search(weight, filter, nDocs)
+ : searchable.search(weight, filter, nDocs, sort);
}
// Store the IOException for later use by the caller of this thread
catch (IOException ioe) {
@@ -256,7 +298,8 @@
if (ioe == null) {
if (sort != null) {
TopFieldDocs docsFields = (TopFieldDocs) docs;
- // If one of the Sort fields is FIELD_DOC, need to fix its values, so that
+ // If one of the Sort fields is FIELD_DOC, need to fix its values, so
+ // that
// it will break ties by doc Id properly. Otherwise, it will compare to
// 'relative' doc Ids, that belong to two different searchables.
for (int j = 0; j < docsFields.fields.length; j++) {
@@ -264,7 +307,9 @@
// iterate over the score docs and change their fields value
for (int j2 = 0; j2 < docs.scoreDocs.length; j2++) {
FieldDoc fd = (FieldDoc) docs.scoreDocs[j2];
- fd.fields[j] = Integer.valueOf(((Integer) fd.fields[j]).intValue() + starts[i]);
+ fd.fields[j] = Integer.valueOf(((Integer) fd.fields[j])
+ .intValue()
+ + starts[i]);
}
break;
}
@@ -273,16 +318,15 @@
((FieldDocSortedHitQueue) hq).setFields(docsFields.fields);
}
ScoreDoc[] scoreDocs = docs.scoreDocs;
- for (int j = 0;
- j < scoreDocs.length;
- j++) { // merge scoreDocs into hq
+ for (int j = 0; j < scoreDocs.length; j++) { // merge scoreDocs into hq
ScoreDoc scoreDoc = scoreDocs[j];
- scoreDoc.doc += starts[i]; // convert doc
- //it would be so nice if we had a thread-safe insert
+ scoreDoc.doc += starts[i]; // convert doc
+ // it would be so nice if we had a thread-safe insert
synchronized (hq) {
// this cast is bad, because we assume that the list has correct type.
// Because of that we have the @SuppressWarnings :-(
- if (scoreDoc == ((PriorityQueue