Index: src/java/org/apache/lucene/search/MultiSearcher.java =================================================================== --- src/java/org/apache/lucene/search/MultiSearcher.java (revision 834250) +++ src/java/org/apache/lucene/search/MultiSearcher.java (working copy) @@ -29,6 +29,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; /** Implements search over a set of Searchables. * @@ -199,15 +200,10 @@ HitQueue hq = new HitQueue(nDocs, false); int totalHits = 0; - for (int i = 0; i < searchables.length; i++) { // search each searcher - TopDocs docs = searchables[i].search(weight, filter, nDocs); - totalHits += docs.totalHits; // update totalHits - ScoreDoc[] scoreDocs = docs.scoreDocs; - for (int j = 0; j < scoreDocs.length; j++) { // merge scoreDocs into hq - ScoreDoc scoreDoc = scoreDocs[j]; - scoreDoc.doc += starts[i]; // convert doc - if(scoreDoc == hq.insertWithOverflow(scoreDoc)) - break; // no more scores > minScore + synchronized(hq) { // we synchronize to prevent a separate sync on each hq call + for (int i = 0; i < searchables.length; i++) { // search each searcher + final TopDocs docs = new MultiSearcherCallableNoSort(searchables[i], weight, filter, nDocs, hq, i, starts).call(); + totalHits += docs.totalHits; // update totalHits } } @@ -221,41 +217,21 @@ } @Override - public TopFieldDocs search (Weight weight, Filter filter, int n, Sort sort) - throws IOException { - FieldDocSortedHitQueue hq = null; + public TopFieldDocs search (Weight weight, Filter filter, int n, Sort sort) throws IOException { + FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue(null, n); int totalHits = 0; float maxScore=Float.NEGATIVE_INFINITY; - for (int i = 0; i < searchables.length; i++) { // search each searcher - TopFieldDocs docs = searchables[i].search (weight, filter, n, sort); - // If one of the Sort fields is FIELD_DOC, need to fix its values, so that - // it will break ties by doc Id properly. Otherwise, it will compare to - // 'relative' doc Ids, that belong to two different searchers. - for (int j = 0; j < docs.fields.length; j++) { - if (docs.fields[j].getType() == SortField.DOC) { - // iterate over the score docs and change their fields value - for (int j2 = 0; j2 < docs.scoreDocs.length; j2++) { - FieldDoc fd = (FieldDoc) docs.scoreDocs[j2]; - fd.fields[j] = Integer.valueOf(((Integer) fd.fields[j]).intValue() + starts[i]); - } - break; - } + synchronized(hq) { // we synchronize to prevent a separate sync on each hq call + for (int i = 0; i < searchables.length; i++) { // search each searcher + final TopFieldDocs docs = new MultiSearcherCallableWithSort(searchables[i], weight, filter, n, hq, sort, i, starts).call(); + totalHits += docs.totalHits; // update totalHits + maxScore = Math.max(maxScore, docs.getMaxScore()); } - if (hq == null) hq = new FieldDocSortedHitQueue (docs.fields, n); - totalHits += docs.totalHits; // update totalHits - maxScore = Math.max(maxScore, docs.getMaxScore()); - ScoreDoc[] scoreDocs = docs.scoreDocs; - for (int j = 0; j < scoreDocs.length; j++) { // merge scoreDocs into hq - ScoreDoc scoreDoc = scoreDocs[j]; - scoreDoc.doc += starts[i]; // convert doc - if (scoreDoc == hq.insertWithOverflow((FieldDoc) scoreDoc)) - break; // no more scores > minScore - } } - ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()]; + final ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()]; for (int i = hq.size() - 1; i >= 0; i--) // put docs in array scoreDocs[i] = hq.pop(); @@ -355,4 +331,104 @@ return rewrittenQuery.weight(cacheSim); } + /** + * A thread subclass for searching a single searchable + */ + static class MultiSearcherCallableNoSort implements Callable { + + private final Searchable searchable; + private final Weight weight; + private final Filter filter; + private final int nDocs; + private final int i; + private final HitQueue hq; + private final int[] starts; + + public MultiSearcherCallableNoSort(Searchable searchable, Weight weight, + Filter filter, int nDocs, HitQueue hq, int i, int[] starts) { + this.searchable = searchable; + this.weight = weight; + this.filter = filter; + this.nDocs = nDocs; + this.hq = hq; + this.i = i; + this.starts = starts; + } + + public TopDocs call() throws IOException { + final TopDocs docs = searchable.search (weight, filter, nDocs); + final ScoreDoc[] scoreDocs = docs.scoreDocs; + for (int j = 0; j < scoreDocs.length; j++) { // merge scoreDocs into hq + final ScoreDoc scoreDoc = scoreDocs[j]; + scoreDoc.doc += starts[i]; // convert doc + //it would be so nice if we had a thread-safe insert + synchronized (hq) { + if (scoreDoc == hq.insertWithOverflow(scoreDoc)) + break; + } // no more scores > minScore + } + return docs; + } + } + + /** + * A thread subclass for searching a single searchable + */ + static class MultiSearcherCallableWithSort implements Callable { + + private final Searchable searchable; + private final Weight weight; + private final Filter filter; + private final int nDocs; + private final int i; + private final FieldDocSortedHitQueue hq; + private final int[] starts; + private final Sort sort; + + public MultiSearcherCallableWithSort(Searchable searchable, Weight weight, + Filter filter, int nDocs, FieldDocSortedHitQueue hq, Sort sort, int i, int[] starts) { + this.searchable = searchable; + this.weight = weight; + this.filter = filter; + this.nDocs = nDocs; + this.hq = hq; + this.i = i; + this.starts = starts; + this.sort = sort; + } + + public TopFieldDocs call() throws IOException { + final TopFieldDocs docs = searchable.search (weight, filter, nDocs, sort); + // If one of the Sort fields is FIELD_DOC, need to fix its values, so that + // it will break ties by doc Id properly. Otherwise, it will compare to + // 'relative' doc Ids, that belong to two different searchables. + for (int j = 0; j < docs.fields.length; j++) { + if (docs.fields[j].getType() == SortField.DOC) { + // iterate over the score docs and change their fields value + for (int j2 = 0; j2 < docs.scoreDocs.length; j2++) { + FieldDoc fd = (FieldDoc) docs.scoreDocs[j2]; + fd.fields[j] = Integer.valueOf(((Integer) fd.fields[j]).intValue() + starts[i]); + } + break; + } + } + + synchronized (hq) { + hq.setFields(docs.fields); + } + + final ScoreDoc[] scoreDocs = docs.scoreDocs; + for (int j = 0; j < scoreDocs.length; j++) { // merge scoreDocs into hq + final FieldDoc fieldDoc = (FieldDoc) scoreDocs[j]; + fieldDoc.doc += starts[i]; // convert doc + //it would be so nice if we had a thread-safe insert + synchronized (hq) { + if (fieldDoc == hq.insertWithOverflow(fieldDoc)) + break; + } // no more scores > minScore + } + return docs; + } + } + } Index: src/java/org/apache/lucene/search/ParallelMultiSearcher.java =================================================================== --- src/java/org/apache/lucene/search/ParallelMultiSearcher.java (revision 834250) +++ src/java/org/apache/lucene/search/ParallelMultiSearcher.java (working copy) @@ -18,9 +18,16 @@ */ import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Callable; -import org.apache.lucene.index.Term; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.Term; +import org.apache.lucene.util.NamedThreadFactory; import org.apache.lucene.util.PriorityQueue; /** Implements parallel search over a set of Searchables. @@ -29,69 +36,60 @@ * or {@link #search(Query,Filter)} methods. */ public class ParallelMultiSearcher extends MultiSearcher { + + private final ExecutorService executor; + private final Searchable[] searchables; + private final int[] starts; - private Searchable[] searchables; - private int[] starts; - - /** Creates a searchable which searches searchables. */ + /** Creates a {@link Searchable} which searches searchables. */ public ParallelMultiSearcher(Searchable... searchables) throws IOException { super(searchables); this.searchables = searchables; this.starts = getStarts(); + executor = Executors.newCachedThreadPool(new NamedThreadFactory(this.getClass().getSimpleName())); } /** - * TODO: parallelize this one too + * Executes each {@link Searchable}'s docFreq() in its own thread and waits for each search to complete and merge + * the results back together. */ @Override - public int docFreq(Term term) throws IOException { - return super.docFreq(term); + public int docFreq(final Term term) throws IOException { + final ArrayList> searchThreads = new ArrayList>(searchables.length); + for (final Searchable searchable : searchables) { + searchThreads.add(executor.submit(new Callable(){ + public Integer call() throws Exception { + return Integer.valueOf(searchable.docFreq(term)); + } + })); + } + final CountDocFreq func = new CountDocFreq(); + map(func, searchThreads); + return func.docFreq; } /** - * A search implementation which spans a new thread for each - * Searchable, waits for each search to complete and merge + * A search implementation which executes each + * {@link Searchable} in its own thread and waits for each search to complete and merge * the results back together. */ @Override - public TopDocs search(Weight weight, Filter filter, int nDocs) - throws IOException { - HitQueue hq = new HitQueue(nDocs, false); - int totalHits = 0; - MultiSearcherThread[] msta = - new MultiSearcherThread[searchables.length]; + public TopDocs search(Weight weight, Filter filter, int nDocs) throws IOException { + final HitQueue hq = new HitQueue(nDocs, false); + final ArrayList> searchThreads = new ArrayList>(searchables.length); for (int i = 0; i < searchables.length; i++) { // search each searchable - // Assume not too many searchables and cost of creating a thread is by far inferior to a search - msta[i] = new MultiSearcherThread(searchables[i], weight, filter, nDocs, - hq, i, starts, "MultiSearcher thread #" + (i + 1)); - msta[i].start(); + searchThreads.add(executor.submit( + new MultiSearcherCallableNoSort(searchables[i], weight, filter, nDocs, hq, i, starts))); } - for (int i = 0; i < searchables.length; i++) { - try { - msta[i].join(); - } catch (InterruptedException ie) { - // In 3.0 we will change this to throw - // InterruptedException instead - Thread.currentThread().interrupt(); - throw new RuntimeException(ie); - } - IOException ioe = msta[i].getIOException(); - if (ioe == null) { - totalHits += msta[i].hits(); - } else { - // if one search produced an IOException, rethrow it - throw ioe; - } - } + final CountTotalHits func = new CountTotalHits(); + map(func, searchThreads); - ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()]; + final ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()]; for (int i = hq.size() - 1; i >= 0; i--) // put docs in array scoreDocs[i] = hq.pop(); - float maxScore = (totalHits==0) ? Float.NEGATIVE_INFINITY : scoreDocs[0].score; - - return new TopDocs(totalHits, scoreDocs, maxScore); + return new TopDocs(func.totalHits, scoreDocs, func.maxScore); } /** @@ -100,45 +98,25 @@ * the results back together. */ @Override - public TopFieldDocs search(Weight weight, Filter filter, int nDocs, Sort sort) - throws IOException { + public TopFieldDocs search(Weight weight, Filter filter, int nDocs, Sort sort) throws IOException { + if (sort == null) throw new NullPointerException(); + // don't specify the fields - we'll wait to do this until we get results - FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue (null, nDocs); - int totalHits = 0; - MultiSearcherThread[] msta = new MultiSearcherThread[searchables.length]; + final FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue (null, nDocs); + final ArrayList> searchThreads = new ArrayList>(searchables.length); for (int i = 0; i < searchables.length; i++) { // search each searchable - // Assume not too many searchables and cost of creating a thread is by far inferior to a search - msta[i] = new MultiSearcherThread(searchables[i], weight, filter, nDocs, - hq, sort, i, starts, "MultiSearcher thread #" + (i + 1)); - msta[i].start(); + searchThreads.add(executor.submit( + new MultiSearcherCallableWithSort(searchables[i], weight, filter, nDocs, hq, sort, i, starts))); } - float maxScore=Float.NEGATIVE_INFINITY; - - for (int i = 0; i < searchables.length; i++) { - try { - msta[i].join(); - } catch (InterruptedException ie) { - // In 3.0 we will change this to throw - // InterruptedException instead - Thread.currentThread().interrupt(); - throw new RuntimeException(ie); - } - IOException ioe = msta[i].getIOException(); - if (ioe == null) { - totalHits += msta[i].hits(); - maxScore=Math.max(maxScore, msta[i].getMaxScore()); - } else { - // if one search produced an IOException, rethrow it - throw ioe; - } - } + final CountTotalHits func = new CountTotalHits(); + map(func, searchThreads); ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()]; for (int i = hq.size() - 1; i >= 0; i--) // put docs in array scoreDocs[i] = hq.pop(); - return new TopFieldDocs(totalHits, scoreDocs, hq.getFields(), maxScore); + return new TopFieldDocs(func.totalHits, scoreDocs, hq.getFields(), func.maxScore); } /** Lower-level search API. @@ -188,117 +166,60 @@ searchables[i].search(weight, filter, hc); } } - + /* - * TODO: this one could be parallelized too - * @see org.apache.lucene.search.Searchable#rewrite(org.apache.lucene.search.Query) + * apply the function to each element of the list. This method encapsulates the logic how + * to wait for concurrently executed searchables. */ - @Override - public Query rewrite(Query original) throws IOException { - return super.rewrite(original); - } - -} - -/** - * A thread subclass for searching a single searchable - */ -class MultiSearcherThread extends Thread { - - private Searchable searchable; - private Weight weight; - private Filter filter; - private int nDocs; - private TopDocs docs; - private int i; - private PriorityQueue hq; - private int[] starts; - private IOException ioe; - private Sort sort; - - public MultiSearcherThread(Searchable searchable, Weight weight, Filter filter, - int nDocs, HitQueue hq, int i, int[] starts, String name) { - super(name); - this.searchable = searchable; - this.weight = weight; - this.filter = filter; - this.nDocs = nDocs; - this.hq = hq; - this.i = i; - this.starts = starts; - } - - public MultiSearcherThread(Searchable searchable, Weight weight, - Filter filter, int nDocs, FieldDocSortedHitQueue hq, Sort sort, int i, - int[] starts, String name) { - super(name); - this.searchable = searchable; - this.weight = weight; - this.filter = filter; - this.nDocs = nDocs; - this.hq = hq; - this.i = i; - this.starts = starts; - this.sort = sort; - } - - @Override - @SuppressWarnings ("unchecked") - public void run() { - try { - docs = (sort == null) ? searchable.search (weight, filter, nDocs) - : searchable.search (weight, filter, nDocs, sort); - } - // Store the IOException for later use by the caller of this thread - catch (IOException ioe) { - this.ioe = ioe; - } - if (ioe == null) { - if (sort != null) { - TopFieldDocs docsFields = (TopFieldDocs) docs; - // If one of the Sort fields is FIELD_DOC, need to fix its values, so that - // it will break ties by doc Id properly. Otherwise, it will compare to - // 'relative' doc Ids, that belong to two different searchables. - for (int j = 0; j < docsFields.fields.length; j++) { - if (docsFields.fields[j].getType() == SortField.DOC) { - // iterate over the score docs and change their fields value - for (int j2 = 0; j2 < docs.scoreDocs.length; j2++) { - FieldDoc fd = (FieldDoc) docs.scoreDocs[j2]; - fd.fields[j] = Integer.valueOf(((Integer) fd.fields[j]).intValue() + starts[i]); - } - break; - } - } - - ((FieldDocSortedHitQueue) hq).setFields(docsFields.fields); + private void map(Function func, ArrayList> list) throws IOException{ + for (Future future : list) { + try{ + func.apply(future.get()); + } catch (ExecutionException e) { + if (e.getCause() instanceof IOException) + throw (IOException) e.getCause(); + throw new RuntimeException(e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // In 3.0 we will change this to throw + // InterruptedException instead + throw new RuntimeException(e); } - ScoreDoc[] scoreDocs = docs.scoreDocs; - for (int j = 0; - j < scoreDocs.length; - j++) { // merge scoreDocs into hq - ScoreDoc scoreDoc = scoreDocs[j]; - scoreDoc.doc += starts[i]; // convert doc - //it would be so nice if we had a thread-safe insert - synchronized (hq) { - // this cast is bad, because we assume that the list has correct type. - // Because of that we have the @SuppressWarnings :-( - if (scoreDoc == ((PriorityQueue) hq).insertWithOverflow(scoreDoc)) - break; - } // no more scores > minScore - } } } - public int hits() { - return docs.totalHits; + // Both functions could be reduced to Int as other values of TopDocs + // are not needed. Using sep. functions is more self documenting. + /** + * A function with one argument + * @param the argument type + */ + private static interface Function { + abstract void apply(T t); } - public float getMaxScore() { - return docs.getMaxScore(); + /** + * Counts the total number of hits for all {@link TopDocs} instances + * provided. + */ + private static class CountTotalHits implements Function { + int totalHits = 0; + float maxScore = Float.NEGATIVE_INFINITY; + + public void apply(T t) { + totalHits += t.totalHits; + maxScore = Math.max(maxScore, t.getMaxScore()); + } } - - public IOException getIOException() { - return ioe; + /** + * Accumulates the document frequency for a term. + */ + private static class CountDocFreq implements Function{ + int docFreq = 0; + + public void apply(Integer t) { + docFreq += t.intValue(); + } } } Index: src/java/org/apache/lucene/util/NamedThreadFactory.java =================================================================== --- src/java/org/apache/lucene/util/NamedThreadFactory.java (revision 0) +++ src/java/org/apache/lucene/util/NamedThreadFactory.java (revision 0) @@ -0,0 +1,53 @@ +/** + * + */ +package org.apache.lucene.util; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A default {@link ThreadFactory} implementation that accepts the name prefix + * of the created threads as a constructor argument. Otherwise, this factory + * yields the same semantics as the thread factory returned by + * {@link Executors#defaultThreadFactory()}. + */ +public class NamedThreadFactory implements ThreadFactory { + private static final AtomicInteger threadPoolNumber = new AtomicInteger(1); + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private static final String NAME_PATTERN = "%s-%d-thread"; + private final String threadNamePrefix; + + /** + * Creates a new {@link NamedThreadFactory} instance + * + * @param threadNamePrefix the name prefix assigned to each thread created. + */ + public NamedThreadFactory(String threadNamePrefix) { + final SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : Thread.currentThread() + .getThreadGroup(); + this.threadNamePrefix = String.format(NAME_PATTERN, + checkPrefix(threadNamePrefix), threadPoolNumber.getAndIncrement()); + } + + private static String checkPrefix(String prefix) { + return prefix == null || prefix.length() == 0 ? "Lucene" : prefix; + } + + /** + * Creates a new {@link Thread} + * + * @see java.util.concurrent.ThreadFactory#newThread(java.lang.Runnable) + */ + public Thread newThread(Runnable r) { + final Thread t = new Thread(group, r, String.format("%s-%d", + this.threadNamePrefix, threadNumber.getAndIncrement()), 0); + t.setDaemon(false); + t.setPriority(Thread.NORM_PRIORITY); + return t; + } + +} Property changes on: src\java\org\apache\lucene\util\NamedThreadFactory.java ___________________________________________________________________ Added: svn:keywords + Date Author Id Revision HeadURL Added: svn:eol-style + native Index: src/test/org/apache/lucene/search/TestMultiSearcher.java =================================================================== --- src/test/org/apache/lucene/search/TestMultiSearcher.java (revision 834250) +++ src/test/org/apache/lucene/search/TestMultiSearcher.java (working copy) @@ -396,6 +396,19 @@ // The scores from the IndexSearcher and Multisearcher should be the same // if the same similarity is used. - assertEquals("MultiSearcher score must be equal to single esrcher score!", score1, scoreN, 1e-6); + assertEquals("MultiSearcher score must be equal to single searcher score!", score1, scoreN, 1e-6); } + + public void testDocFreq() throws IOException{ + RAMDirectory dir1 = new RAMDirectory(); + RAMDirectory dir2 = new RAMDirectory(); + + initIndex(dir1, 10, true, "x"); // documents with two tokens "doc0" and "x", "doc1" and x, etc... + initIndex(dir2, 5, true, "x"); // documents with two tokens "doc0" and "x", "doc1" and x, etc... + IndexSearcher searcher1 = new IndexSearcher(dir1, true); + IndexSearcher searcher2 = new IndexSearcher(dir2, true); + + MultiSearcher multiSearcher = getMultiSearcherInstance(new Searcher[]{searcher1, searcher2}); + assertEquals(15, multiSearcher.docFreq(new Term("contents","x"))); + } }