Index: src/java/org/apache/lucene/search/MultiSearcher.java =================================================================== --- src/java/org/apache/lucene/search/MultiSearcher.java (revision 724618) +++ src/java/org/apache/lucene/search/MultiSearcher.java (working copy) @@ -21,6 +21,7 @@ import org.apache.lucene.document.FieldSelector; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.Term; +import org.apache.lucene.util.PriorityQueue; import java.io.IOException; import java.util.HashMap; @@ -193,61 +194,148 @@ public int maxDoc() throws IOException { return maxDoc; } + + static final class SubsearcherTopDocs { + final TopDocs topDocs; + final int subsearcherIdx; + int hitIdx; + + SubsearcherTopDocs(final TopDocs topDocs, final int subsearcherIdx) { + this.topDocs = topDocs; + this.subsearcherIdx = subsearcherIdx; + this.hitIdx = 0; + } + ScoreDoc topDoc() { + return topDocs.scoreDocs[hitIdx]; + } + + void convertTopDoc(final int starts[]) { + topDoc().doc += starts[subsearcherIdx]; + } + + boolean hasNext() { + return hitIdx < topDocs.scoreDocs.length; + } + } + + static final class SubsearcherHitQueue extends PriorityQueue { + SubsearcherHitQueue(int size) { + initialize(size); + } + + /** + * Order SubsearcherTopDocs by decreasing score, then increasing docid + */ + protected final boolean lessThan(Object a, Object b) { + final SubsearcherTopDocs subsearcherTopDocsA = (SubsearcherTopDocs)a; + final SubsearcherTopDocs subsearcherTopDocsB = (SubsearcherTopDocs)b; + final ScoreDoc hitA = subsearcherTopDocsA.topDoc(); + final ScoreDoc hitB = subsearcherTopDocsB.topDoc(); + if (hitA.score == hitB.score) { + return hitA.doc < hitB.doc; + } else { + return hitA.score > hitB.score; + } + } + } + public TopDocs search(Weight weight, Filter filter, int nDocs) throws IOException { - HitQueue hq = new HitQueue(nDocs); + SubsearcherHitQueue shq = new SubsearcherHitQueue(searchables.length); int totalHits = 0; for (int i = 0; i < searchables.length; i++) { // search each searcher TopDocs docs = searchables[i].search(weight, filter, nDocs); totalHits += docs.totalHits; // update totalHits - ScoreDoc[] scoreDocs = docs.scoreDocs; - for (int j = 0; j < scoreDocs.length; j++) { // merge scoreDocs into hq - ScoreDoc scoreDoc = scoreDocs[j]; - scoreDoc.doc += starts[i]; // convert doc - if(!hq.insert(scoreDoc)) - break; // no more scores > minScore + if (docs.totalHits > 0) { + final SubsearcherTopDocs subsearcherTopDocs = new SubsearcherTopDocs(docs, i); + subsearcherTopDocs.convertTopDoc(starts); + shq.put(subsearcherTopDocs); } } - ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()]; - for (int i = hq.size()-1; i >= 0; i--) // put docs in array - scoreDocs[i] = (ScoreDoc)hq.pop(); - - float maxScore = (totalHits==0) ? Float.NEGATIVE_INFINITY : scoreDocs[0].score; - + // merge top nDocs sorted results into scoreDocs + ScoreDoc[] scoreDocs = new ScoreDoc[Math.min(totalHits, nDocs)]; + + for (int i = 0; i < scoreDocs.length; i++) { + // using the top(), top().change(), adjustTop() optimized form + final SubsearcherTopDocs topSubsearcherHits = (SubsearcherTopDocs) shq.top(); + final ScoreDoc topDoc = topSubsearcherHits.topDoc(); + scoreDocs[i] = topDoc; + + topSubsearcherHits.hitIdx++; + + if (topSubsearcherHits.hasNext()) { + topSubsearcherHits.convertTopDoc(starts); + shq.adjustTop(); + } else { + shq.pop(); + } + } + + float maxScore = (totalHits == 0) ? Float.NEGATIVE_INFINITY : scoreDocs[0].score; + return new TopDocs(totalHits, scoreDocs, maxScore); } + static class SubsearcherFieldDocSortedHitQueue extends FieldDocSortedHitQueue { + SubsearcherFieldDocSortedHitQueue(SortField[] fields, int size) { + super(fields, size); + } + + protected FieldDoc asFieldDoc (Object obj) { + final SubsearcherTopDocs subsearcherTopDocs = (SubsearcherTopDocs)obj; + final FieldDoc hit = (FieldDoc) subsearcherTopDocs.topDoc(); + return hit; + } + + protected boolean lessThan(Object a, Object b) { + boolean c = super.lessThan(a, b); + // invert lessThan to make a max top queue + return ! c; + } + } + public TopFieldDocs search (Weight weight, Filter filter, int n, Sort sort) throws IOException { - FieldDocSortedHitQueue hq = null; + SubsearcherFieldDocSortedHitQueue shq = null; int totalHits = 0; - float maxScore=Float.NEGATIVE_INFINITY; - for (int i = 0; i < searchables.length; i++) { // search each searcher - TopFieldDocs docs = searchables[i].search (weight, filter, n, sort); - - if (hq == null) hq = new FieldDocSortedHitQueue (docs.fields, n); + TopFieldDocs docs = searchables[i].search(weight, filter, n, sort); totalHits += docs.totalHits; // update totalHits - maxScore = Math.max(maxScore, docs.getMaxScore()); - ScoreDoc[] scoreDocs = docs.scoreDocs; - for (int j = 0; j < scoreDocs.length; j++) { // merge scoreDocs into hq - ScoreDoc scoreDoc = scoreDocs[j]; - scoreDoc.doc += starts[i]; // convert doc - if (!hq.insert (scoreDoc)) - break; // no more scores > minScore + if (docs.totalHits > 0) { + if (shq == null) shq = new SubsearcherFieldDocSortedHitQueue (docs.fields, n); + final SubsearcherTopDocs subsearcherTopDocs = new SubsearcherTopDocs(docs, i); + subsearcherTopDocs.convertTopDoc(starts); + shq.put(subsearcherTopDocs); } } - ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()]; - for (int i = hq.size() - 1; i >= 0; i--) // put docs in array - scoreDocs[i] = (ScoreDoc) hq.pop(); + // merge top nDocs sorted results into scoreDocs + ScoreDoc[] scoreDocs = new ScoreDoc[Math.min(totalHits, n)]; - return new TopFieldDocs (totalHits, scoreDocs, hq.getFields(), maxScore); + for (int i = 0; i < scoreDocs.length; i++) { + // using the top(), top().change(), adjustTop() optimized form + final SubsearcherTopDocs topSubsearcherHits = (SubsearcherTopDocs) shq.top(); + final ScoreDoc topDoc = topSubsearcherHits.topDoc(); + scoreDocs[i] = topDoc; + + topSubsearcherHits.hitIdx++; + + if (topSubsearcherHits.hasNext()) { + topSubsearcherHits.convertTopDoc(starts); + shq.adjustTop(); + } else { + shq.pop(); + } + } + + float maxScore = (totalHits == 0) ? Float.NEGATIVE_INFINITY : scoreDocs[0].score; + + return new TopFieldDocs(totalHits, scoreDocs, shq.getFields(), maxScore); } Index: src/java/org/apache/lucene/search/ParallelMultiSearcher.java =================================================================== --- src/java/org/apache/lucene/search/ParallelMultiSearcher.java (revision 724618) +++ src/java/org/apache/lucene/search/ParallelMultiSearcher.java (working copy) @@ -22,21 +22,21 @@ import org.apache.lucene.index.Term; import org.apache.lucene.util.PriorityQueue; -/** Implements parallel search over a set of Searchables. +/** Implements parallel search over a set of {@link Searchable}s. * *

Applications usually need only call the inherited {@link #search(Query)} * or {@link #search(Query,Filter)} methods. */ public class ParallelMultiSearcher extends MultiSearcher { - private Searchable[] searchables; - private int[] starts; + private final Searchable[] searchables; + private final int[] starts; /** Creates a searcher which searches searchables. */ public ParallelMultiSearcher(Searchable[] searchables) throws IOException { super(searchables); - this.searchables=searchables; - this.starts=getStarts(); + this.searchables = searchables; + this.starts = getStarts(); } /** @@ -47,105 +47,126 @@ } /** - * A search implementation which spans a new thread for each - * Searchable, waits for each search to complete and merge + * A search implementation which spawns a new thread for each + * Searchable, waits for each search to complete and merges * the results back together. */ public TopDocs search(Weight weight, Filter filter, int nDocs) throws IOException { - HitQueue hq = new HitQueue(nDocs); - int totalHits = 0; - MultiSearcherThread[] msta = - new MultiSearcherThread[searchables.length]; + final MultiSearcherThread[] msta = new MultiSearcherThread[searchables.length]; for (int i = 0; i < searchables.length; i++) { // search each searcher - // Assume not too many searchables and cost of creating a thread is by far inferior to a search - msta[i] = - new MultiSearcherThread( - searchables[i], - weight, - filter, - nDocs, - hq, - i, - starts, - "MultiSearcher thread #" + (i + 1)); + // Assume not too many searchables and cost of creating a thread is much less than a search + msta[i] = new MultiSearcherThread(i, weight, filter, nDocs, null /* no sort */); msta[i].start(); } + SubsearcherHitQueue shq = new SubsearcherHitQueue(searchables.length); + int totalHits = 0; + for (int i = 0; i < searchables.length; i++) { try { msta[i].join(); } catch (InterruptedException ie) { ; // TODO: what should we do with this??? } - IOException ioe = msta[i].getIOException(); - if (ioe == null) { - totalHits += msta[i].hits(); - } else { + IOException ioe = msta[i].ioe; + if (ioe != null) { // if one search produced an IOException, rethrow it throw ioe; } + TopDocs docs = msta[i].docs; + totalHits += docs.totalHits; + if (docs.totalHits > 0) { + final SubsearcherTopDocs subsearcherTopDocs = new SubsearcherTopDocs(docs, i); + subsearcherTopDocs.convertTopDoc(starts); + shq.put(subsearcherTopDocs); + } } - ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()]; - for (int i = hq.size() - 1; i >= 0; i--) // put docs in array - scoreDocs[i] = (ScoreDoc) hq.pop(); + // merge top nDocs sorted results into scoreDocs + ScoreDoc[] scoreDocs = new ScoreDoc[Math.min(totalHits, nDocs)]; - float maxScore = (totalHits==0) ? Float.NEGATIVE_INFINITY : scoreDocs[0].score; + for (int i = 0; i < scoreDocs.length; i++) { + // using the top(), top().change(), adjustTop() optimized form + final SubsearcherTopDocs topSubsearcherHits = (SubsearcherTopDocs) shq.top(); + final ScoreDoc topDoc = topSubsearcherHits.topDoc(); + scoreDocs[i] = topDoc; + + topSubsearcherHits.hitIdx++; + + if (topSubsearcherHits.hasNext()) { + topSubsearcherHits.convertTopDoc(starts); + shq.adjustTop(); + } else { + shq.pop(); + } + } + + float maxScore = (totalHits == 0) ? Float.NEGATIVE_INFINITY : scoreDocs[0].score; return new TopDocs(totalHits, scoreDocs, maxScore); } /** - * A search implementation allowing sorting which spans a new thread for each + * A search implementation allowing sorting which spawns a new thread for each * Searchable, waits for each search to complete and merges * the results back together. */ public TopFieldDocs search(Weight weight, Filter filter, int nDocs, Sort sort) throws IOException { - // don't specify the fields - we'll wait to do this until we get results - FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue (null, nDocs); - int totalHits = 0; - MultiSearcherThread[] msta = new MultiSearcherThread[searchables.length]; + final MultiSearcherThread[] msta = new MultiSearcherThread[searchables.length]; for (int i = 0; i < searchables.length; i++) { // search each searcher - // Assume not too many searchables and cost of creating a thread is by far inferior to a search - msta[i] = - new MultiSearcherThread( - searchables[i], - weight, - filter, - nDocs, - hq, - sort, - i, - starts, - "MultiSearcher thread #" + (i + 1)); + // Assume not too many searchables and cost of creating a thread is much less than a search + msta[i] = new MultiSearcherThread(i, weight, filter, nDocs, sort); msta[i].start(); } - float maxScore=Float.NEGATIVE_INFINITY; - + SubsearcherFieldDocSortedHitQueue shq = null; + int totalHits = 0; + for (int i = 0; i < searchables.length; i++) { try { msta[i].join(); } catch (InterruptedException ie) { - ; // TODO: what should we do with this??? + // TODO: what should we do with this??? } - IOException ioe = msta[i].getIOException(); - if (ioe == null) { - totalHits += msta[i].hits(); - maxScore=Math.max(maxScore, msta[i].getMaxScore()); - } else { + IOException ioe = msta[i].ioe; + if (ioe != null) { // if one search produced an IOException, rethrow it throw ioe; } + TopFieldDocs docs = (TopFieldDocs) msta[i].docs; + totalHits += docs.totalHits; + if (docs.totalHits > 0) { + if (shq == null) shq = new SubsearcherFieldDocSortedHitQueue (docs.fields, nDocs); + final SubsearcherTopDocs subsearcherTopDocs = new SubsearcherTopDocs(docs, i); + subsearcherTopDocs.convertTopDoc(starts); + shq.put(subsearcherTopDocs); + } } - ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()]; - for (int i = hq.size() - 1; i >= 0; i--) // put docs in array - scoreDocs[i] = (ScoreDoc) hq.pop(); + // merge top nDocs sorted results into scoreDocs + ScoreDoc[] scoreDocs = new ScoreDoc[Math.min(totalHits, nDocs)]; - return new TopFieldDocs(totalHits, scoreDocs, hq.getFields(), maxScore); + for (int i = 0; i < scoreDocs.length; i++) { + // using the top(), top().change(), adjustTop() optimized form + final SubsearcherTopDocs topSubsearcherHits = (SubsearcherTopDocs) shq.top(); + final ScoreDoc topDoc = topSubsearcherHits.topDoc(); + scoreDocs[i] = topDoc; + + topSubsearcherHits.hitIdx++; + + if (topSubsearcherHits.hasNext()) { + topSubsearcherHits.convertTopDoc(starts); + shq.adjustTop(); + } else { + shq.pop(); + } + } + + float maxScore = (totalHits == 0) ? Float.NEGATIVE_INFINITY : scoreDocs[0].score; + + return new TopFieldDocs(totalHits, scoreDocs, shq.getFields(), maxScore); } /** Lower-level search API. @@ -162,130 +183,66 @@ * @param filter if non-null, a bitset used to eliminate some documents * @param results to receive hits * - * @todo parallelize this one too + * @TODO parallelize this one too */ public void search(Weight weight, Filter filter, final HitCollector results) throws IOException { for (int i = 0; i < searchables.length; i++) { - final int start = starts[i]; searchables[i].search(weight, filter, new HitCollector() { - public void collect(int doc, float score) { - results.collect(doc + start, score); - } - }); - + public void collect(int doc, float score) { + results.collect(doc + start, score); + } + }); } } - /* - * TODO: this one could be parallelized too + /** + * @TODO: this one could be parallelized too * @see org.apache.lucene.search.Searchable#rewrite(org.apache.lucene.search.Query) */ public Query rewrite(Query original) throws IOException { return super.rewrite(original); } -} + /** + * A Thread subclass for searching a single Searchable + */ + class MultiSearcherThread extends Thread { -/** - * A thread subclass for searching a single searchable - */ -class MultiSearcherThread extends Thread { + private final int i; + private final Searchable searchable; + private final Weight weight; + private final Filter filter; + private final int nDocs; + private final Sort sort; + TopDocs docs; + IOException ioe; - private Searchable searchable; - private Weight weight; - private Filter filter; - private int nDocs; - private TopDocs docs; - private int i; - private PriorityQueue hq; - private int[] starts; - private IOException ioe; - private Sort sort; - - public MultiSearcherThread( - Searchable searchable, - Weight weight, - Filter filter, - int nDocs, - HitQueue hq, - int i, - int[] starts, - String name) { - super(name); - this.searchable = searchable; - this.weight = weight; - this.filter = filter; - this.nDocs = nDocs; - this.hq = hq; - this.i = i; - this.starts = starts; - } - - public MultiSearcherThread( - Searchable searchable, - Weight weight, - Filter filter, - int nDocs, - FieldDocSortedHitQueue hq, - Sort sort, - int i, - int[] starts, - String name) { - super(name); - this.searchable = searchable; - this.weight = weight; - this.filter = filter; - this.nDocs = nDocs; - this.hq = hq; - this.i = i; - this.starts = starts; - this.sort = sort; - } - - public void run() { - try { - docs = (sort == null) ? searchable.search (weight, filter, nDocs) - : searchable.search (weight, filter, nDocs, sort); + public MultiSearcherThread(int i, + Weight weight, + Filter filter, + int nDocs, + Sort sort + ) { + super("MultiSearcher thread #" + (i + 1)); + this.i = i; + this.searchable = searchables[i]; + this.weight = weight; + this.filter = filter; + this.nDocs = nDocs; + this.sort = sort; } - // Store the IOException for later use by the caller of this thread - catch (IOException ioe) { - this.ioe = ioe; - } - if (ioe == null) { - // if we are sorting by fields, we need to tell the field sorted hit queue - // the actual type of fields, in case the original list contained AUTO. - // if the searchable returns null for fields, we'll have problems. - if (sort != null) { - ((FieldDocSortedHitQueue)hq).setFields (((TopFieldDocs)docs).fields); + + public void run() { + try { + docs = (sort == null) ? searchable.search (weight, filter, nDocs) + : searchable.search (weight, filter, nDocs, sort); + } catch (IOException ioe) { + // Store the IOException for later use by the caller of this thread + this.ioe = ioe; } - ScoreDoc[] scoreDocs = docs.scoreDocs; - for (int j = 0; - j < scoreDocs.length; - j++) { // merge scoreDocs into hq - ScoreDoc scoreDoc = scoreDocs[j]; - scoreDoc.doc += starts[i]; // convert doc - //it would be so nice if we had a thread-safe insert - synchronized (hq) { - if (!hq.insert(scoreDoc)) - break; - } // no more scores > minScore - } } } - - public int hits() { - return docs.totalHits; - } - - public float getMaxScore() { - return docs.getMaxScore(); - } - - public IOException getIOException() { - return ioe; - } - } Index: src/java/org/apache/lucene/search/FieldDocSortedHitQueue.java =================================================================== --- src/java/org/apache/lucene/search/FieldDocSortedHitQueue.java (revision 724618) +++ src/java/org/apache/lucene/search/FieldDocSortedHitQueue.java (working copy) @@ -93,6 +93,9 @@ return ret; } + protected FieldDoc asFieldDoc (Object obj) { + return (FieldDoc)obj; + } /** * Returns whether a is less relevant than b. @@ -100,9 +103,9 @@ * @param b ScoreDoc * @return true if document a should be sorted after document b. */ - protected final boolean lessThan (final Object a, final Object b) { - final FieldDoc docA = (FieldDoc) a; - final FieldDoc docB = (FieldDoc) b; + protected boolean lessThan (final Object a, final Object b) { + final FieldDoc docA = asFieldDoc(a); + final FieldDoc docB = asFieldDoc(b); final int n = fields.length; int c = 0; for (int i=0; i