Index: src/java/org/apache/lucene/search/ParallelMultiSearcher.java =================================================================== --- src/java/org/apache/lucene/search/ParallelMultiSearcher.java (revision 833527) +++ src/java/org/apache/lucene/search/ParallelMultiSearcher.java (working copy) @@ -19,14 +19,16 @@ import java.io.IOException; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.Term; -import org.apache.lucene.index.IndexReader; import org.apache.lucene.util.PriorityQueue; -/** Implements parallel search over a set of Searchables. - * - *

Applications usually need only call the inherited {@link #search(Query)} - * or {@link #search(Query,Filter)} methods. +/** + * Implements parallel search over a set of Searchables. + * + *

+ * Applications usually need only call the inherited {@link #search(Query)} or + * {@link #search(Query,Filter)} methods. */ public class ParallelMultiSearcher extends MultiSearcher { @@ -40,28 +42,56 @@ this.starts = getStarts(); } - /** - * TODO: parallelize this one too - */ + @Override public int docFreq(Term term) throws IOException { - return super.docFreq(term); + int docFreq = 0; + + MultiDocFreqThread[] mdft_ary = new MultiDocFreqThread[searchables.length]; + for (int i = 0; i < searchables.length; i++) { + // create thread + mdft_ary[i] = new MultiDocFreqThread(searchables[i], term, + "MultiDocFreq thread #" + i); + + // start threads + mdft_ary[i].start(); + } + + // aggregate results + for (int i = 0; i < searchables.length; i++) { + try { + mdft_ary[i].join(); + } catch (InterruptedException ie) { + // In 3.0 we will change this to throw + // InterruptedException instead + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); + } + IOException ioe = mdft_ary[i].getIOException(); + if (ioe == null) { + docFreq += mdft_ary[i].docFreq(); + } else { + // if one search produced an IOException, rethrow it + throw ioe; + } + } + + return docFreq; } /** - * A search implementation which spans a new thread for each - * Searchable, waits for each search to complete and merge - * the results back together. + * A search implementation which spans a new thread for each Searchable, waits + * for each search to complete and merge the results back together. */ @Override public TopDocs search(Weight weight, Filter filter, int nDocs) - throws IOException { + throws IOException { HitQueue hq = new HitQueue(nDocs, false); int totalHits = 0; - MultiSearcherThread[] msta = - new MultiSearcherThread[searchables.length]; + MultiSearcherThread[] msta = new MultiSearcherThread[searchables.length]; for (int i = 0; i < searchables.length; i++) { // search each searchable - // Assume not too many searchables and cost of creating a thread is by far inferior to a search + // Assume not too many searchables and cost of creating a thread is by far + // inferior to a search msta[i] = new MultiSearcherThread(searchables[i], weight, filter, nDocs, hq, i, starts, "MultiSearcher thread #" + (i + 1)); msta[i].start(); @@ -86,35 +116,38 @@ } ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()]; - for (int i = hq.size() - 1; i >= 0; i--) // put docs in array + for (int i = hq.size() - 1; i >= 0; i--) + // put docs in array scoreDocs[i] = hq.pop(); - float maxScore = (totalHits==0) ? Float.NEGATIVE_INFINITY : scoreDocs[0].score; - + float maxScore = (totalHits == 0) ? Float.NEGATIVE_INFINITY + : scoreDocs[0].score; + return new TopDocs(totalHits, scoreDocs, maxScore); } /** * A search implementation allowing sorting which spans a new thread for each - * Searchable, waits for each search to complete and merges - * the results back together. + * Searchable, waits for each search to complete and merges the results back + * together. */ @Override public TopFieldDocs search(Weight weight, Filter filter, int nDocs, Sort sort) - throws IOException { + throws IOException { // don't specify the fields - we'll wait to do this until we get results - FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue (null, nDocs); + FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue(null, nDocs); int totalHits = 0; MultiSearcherThread[] msta = new MultiSearcherThread[searchables.length]; for (int i = 0; i < searchables.length; i++) { // search each searchable - // Assume not too many searchables and cost of creating a thread is by far inferior to a search + // Assume not too many searchables and cost of creating a thread is by far + // inferior to a search msta[i] = new MultiSearcherThread(searchables[i], weight, filter, nDocs, hq, sort, i, starts, "MultiSearcher thread #" + (i + 1)); msta[i].start(); } - float maxScore=Float.NEGATIVE_INFINITY; - + float maxScore = Float.NEGATIVE_INFINITY; + for (int i = 0; i < searchables.length; i++) { try { msta[i].join(); @@ -127,7 +160,7 @@ IOException ioe = msta[i].getIOException(); if (ioe == null) { totalHits += msta[i].hits(); - maxScore=Math.max(maxScore, msta[i].getMaxScore()); + maxScore = Math.max(maxScore, msta[i].getMaxScore()); } else { // if one search produced an IOException, rethrow it throw ioe; @@ -135,63 +168,72 @@ } ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()]; - for (int i = hq.size() - 1; i >= 0; i--) // put docs in array + for (int i = hq.size() - 1; i >= 0; i--) + // put docs in array scoreDocs[i] = hq.pop(); return new TopFieldDocs(totalHits, scoreDocs, hq.getFields(), maxScore); } - /** Lower-level search API. - * - *

{@link Collector#collect(int)} is called for every matching document. - * - *

Applications should only use this if they need all of the - * matching documents. The high-level search API ({@link - * Searcher#search(Query)}) is usually more efficient, as it skips - * non-high-scoring hits. - * - * @param weight to match documents - * @param filter if non-null, a bitset used to eliminate some documents - * @param collector to receive hits - * - * TODO: parallelize this one too - */ + /** + * Lower-level search API. + * + *

+ * {@link Collector#collect(int)} is called for every matching document. + * + *

+ * Applications should only use this if they need all of the matching + * documents. The high-level search API ({@link Searcher#search(Query)}) is + * usually more efficient, as it skips non-high-scoring hits. + * + * @param weight + * to match documents + * @param filter + * if non-null, a bitset used to eliminate some documents + * @param collector + * to receive hits + * + * TODO: parallelize this one too + */ @Override public void search(Weight weight, Filter filter, final Collector collector) - throws IOException { - for (int i = 0; i < searchables.length; i++) { + throws IOException { + for (int i = 0; i < searchables.length; i++) { - final int start = starts[i]; + final int start = starts[i]; - final Collector hc = new Collector() { - @Override - public void setScorer(Scorer scorer) throws IOException { - collector.setScorer(scorer); - } - - @Override - public void collect(int doc) throws IOException { - collector.collect(doc); - } - - @Override - public void setNextReader(IndexReader reader, int docBase) throws IOException { - collector.setNextReader(reader, start + docBase); - } - - @Override - public boolean acceptsDocsOutOfOrder() { - return collector.acceptsDocsOutOfOrder(); - } - }; - - searchables[i].search(weight, filter, hc); - } + final Collector hc = new Collector() { + @Override + public void setScorer(Scorer scorer) throws IOException { + collector.setScorer(scorer); + } + + @Override + public void collect(int doc) throws IOException { + collector.collect(doc); + } + + @Override + public void setNextReader(IndexReader reader, int docBase) + throws IOException { + collector.setNextReader(reader, start + docBase); + } + + @Override + public boolean acceptsDocsOutOfOrder() { + return collector.acceptsDocsOutOfOrder(); + } + }; + + searchables[i].search(weight, filter, hc); + } } /* * TODO: this one could be parallelized too - * @see org.apache.lucene.search.Searchable#rewrite(org.apache.lucene.search.Query) + * + * @see + * org.apache.lucene.search.Searchable#rewrite(org.apache.lucene.search.Query) */ @Override public Query rewrite(Query original) throws IOException { @@ -201,7 +243,7 @@ } /** - * A thread subclass for searching a single searchable + * A thread subclass for searching a single searchable */ class MultiSearcherThread extends Thread { @@ -216,8 +258,8 @@ private IOException ioe; private Sort sort; - public MultiSearcherThread(Searchable searchable, Weight weight, Filter filter, - int nDocs, HitQueue hq, int i, int[] starts, String name) { + public MultiSearcherThread(Searchable searchable, Weight weight, + Filter filter, int nDocs, HitQueue hq, int i, int[] starts, String name) { super(name); this.searchable = searchable; this.weight = weight; @@ -243,11 +285,11 @@ } @Override - @SuppressWarnings ("unchecked") + @SuppressWarnings("unchecked") public void run() { try { - docs = (sort == null) ? searchable.search (weight, filter, nDocs) - : searchable.search (weight, filter, nDocs, sort); + docs = (sort == null) ? searchable.search(weight, filter, nDocs) + : searchable.search(weight, filter, nDocs, sort); } // Store the IOException for later use by the caller of this thread catch (IOException ioe) { @@ -256,7 +298,8 @@ if (ioe == null) { if (sort != null) { TopFieldDocs docsFields = (TopFieldDocs) docs; - // If one of the Sort fields is FIELD_DOC, need to fix its values, so that + // If one of the Sort fields is FIELD_DOC, need to fix its values, so + // that // it will break ties by doc Id properly. Otherwise, it will compare to // 'relative' doc Ids, that belong to two different searchables. for (int j = 0; j < docsFields.fields.length; j++) { @@ -264,7 +307,9 @@ // iterate over the score docs and change their fields value for (int j2 = 0; j2 < docs.scoreDocs.length; j2++) { FieldDoc fd = (FieldDoc) docs.scoreDocs[j2]; - fd.fields[j] = Integer.valueOf(((Integer) fd.fields[j]).intValue() + starts[i]); + fd.fields[j] = Integer.valueOf(((Integer) fd.fields[j]) + .intValue() + + starts[i]); } break; } @@ -273,16 +318,15 @@ ((FieldDocSortedHitQueue) hq).setFields(docsFields.fields); } ScoreDoc[] scoreDocs = docs.scoreDocs; - for (int j = 0; - j < scoreDocs.length; - j++) { // merge scoreDocs into hq + for (int j = 0; j < scoreDocs.length; j++) { // merge scoreDocs into hq ScoreDoc scoreDoc = scoreDocs[j]; - scoreDoc.doc += starts[i]; // convert doc - //it would be so nice if we had a thread-safe insert + scoreDoc.doc += starts[i]; // convert doc + // it would be so nice if we had a thread-safe insert synchronized (hq) { // this cast is bad, because we assume that the list has correct type. // Because of that we have the @SuppressWarnings :-( - if (scoreDoc == ((PriorityQueue) hq).insertWithOverflow(scoreDoc)) + if (scoreDoc == ((PriorityQueue) hq) + .insertWithOverflow(scoreDoc)) break; } // no more scores > minScore } @@ -294,11 +338,45 @@ } public float getMaxScore() { - return docs.getMaxScore(); + return docs.getMaxScore(); } - + public IOException getIOException() { return ioe; } } + +/** + * A thread subclass for executing a doc freq + */ +class MultiDocFreqThread extends Thread { + private Term term; + private Searchable searchable; + private IOException ioe; + + private int docFreq; + + public MultiDocFreqThread(Searchable searchable, Term term, String name) { + super(name); + this.searchable = searchable; + this.term = term; + } + + public void run() { + try { + this.docFreq = searchable.docFreq(term); + } catch (IOException ioe) { + this.ioe = ioe; + } + } + + public int docFreq() { + return this.docFreq; + } + + public IOException getIOException() { + return ioe; + } + +}