Index: src/java/org/apache/lucene/search/ParallelMultiSearcher.java =================================================================== --- src/java/org/apache/lucene/search/ParallelMultiSearcher.java (revision 834055) +++ src/java/org/apache/lucene/search/ParallelMultiSearcher.java (working copy) @@ -18,9 +18,16 @@ */ import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.Term; -import org.apache.lucene.index.IndexReader; +import org.apache.lucene.util.NamedThreadFactory; import org.apache.lucene.util.PriorityQueue; /** Implements parallel search over a set of Searchables. @@ -29,67 +36,58 @@ * or {@link #search(Query,Filter)} methods. */ public class ParallelMultiSearcher extends MultiSearcher { + + private final ExecutorService executor; + private final Searchable[] searchables; + private final int[] starts; - private Searchable[] searchables; - private int[] starts; - - /** Creates a searchable which searches searchables. */ + /** Creates a {@link Searchable} which searches searchables. */ public ParallelMultiSearcher(Searchable... searchables) throws IOException { super(searchables); this.searchables = searchables; this.starts = getStarts(); + executor = Executors.newCachedThreadPool(new NamedThreadFactory(this.getClass().getSimpleName())); } /** - * TODO: parallelize this one too + * Executes each {@link Searchable}'s docFreq() in its own thread and waits for each search to complete and merge + * the results back together. */ @Override - public int docFreq(Term term) throws IOException { - return super.docFreq(term); + public int docFreq(final Term term) throws IOException { + final ArrayList> searchThreads = new ArrayList>(searchables.length); + for (final Searchable searchable : searchables) { + searchThreads.add(executor.submit(new Callable(){ + public Integer call() throws Exception { + return Integer.valueOf(searchable.docFreq(term)); + } + })); + } + final CountDocFreq func = new CountDocFreq(); + map(func, searchThreads); + return func.docFreq; } /** - * A search implementation which spans a new thread for each - * Searchable, waits for each search to complete and merge + * A search implementation which executes each + * {@link Searchable} in its own thread and waits for each search to complete and merge * the results back together. */ @Override public TopDocs search(Weight weight, Filter filter, int nDocs) throws IOException { - HitQueue hq = new HitQueue(nDocs, false); - int totalHits = 0; - MultiSearcherThread[] msta = - new MultiSearcherThread[searchables.length]; + final HitQueue hq = new HitQueue(nDocs, false); + final ArrayList> searchThreads = new ArrayList>(searchables.length); for (int i = 0; i < searchables.length; i++) { // search each searchable - // Assume not too many searchables and cost of creating a thread is by far inferior to a search - msta[i] = new MultiSearcherThread(searchables[i], weight, filter, nDocs, - hq, i, starts, "MultiSearcher thread #" + (i + 1)); - msta[i].start(); + searchThreads.add(executor.submit( + new MultiSearcherThread(searchables[i], weight, filter, nDocs, hq, i, starts))); } - - for (int i = 0; i < searchables.length; i++) { - try { - msta[i].join(); - } catch (InterruptedException ie) { - // In 3.0 we will change this to throw - // InterruptedException instead - Thread.currentThread().interrupt(); - throw new RuntimeException(ie); - } - IOException ioe = msta[i].getIOException(); - if (ioe == null) { - totalHits += msta[i].hits(); - } else { - // if one search produced an IOException, rethrow it - throw ioe; - } - } - - ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()]; + final int totalHits = totalHits(searchThreads); + final ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()]; for (int i = hq.size() - 1; i >= 0; i--) // put docs in array scoreDocs[i] = hq.pop(); - float maxScore = (totalHits==0) ? Float.NEGATIVE_INFINITY : scoreDocs[0].score; + final float maxScore = (totalHits==0) ? Float.NEGATIVE_INFINITY : scoreDocs[0].score; return new TopDocs(totalHits, scoreDocs, maxScore); } @@ -103,44 +101,22 @@ public TopFieldDocs search(Weight weight, Filter filter, int nDocs, Sort sort) throws IOException { // don't specify the fields - we'll wait to do this until we get results - FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue (null, nDocs); - int totalHits = 0; - MultiSearcherThread[] msta = new MultiSearcherThread[searchables.length]; + final FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue (null, nDocs); + final ArrayList> searchThreads = new ArrayList>(searchables.length); for (int i = 0; i < searchables.length; i++) { // search each searchable - // Assume not too many searchables and cost of creating a thread is by far inferior to a search - msta[i] = new MultiSearcherThread(searchables[i], weight, filter, nDocs, - hq, sort, i, starts, "MultiSearcher thread #" + (i + 1)); - msta[i].start(); + searchThreads.add(executor.submit( + new MultiSearcherThread(searchables[i], weight, filter, nDocs, hq, sort, i, starts))); } - - float maxScore=Float.NEGATIVE_INFINITY; - - for (int i = 0; i < searchables.length; i++) { - try { - msta[i].join(); - } catch (InterruptedException ie) { - // In 3.0 we will change this to throw - // InterruptedException instead - Thread.currentThread().interrupt(); - throw new RuntimeException(ie); - } - IOException ioe = msta[i].getIOException(); - if (ioe == null) { - totalHits += msta[i].hits(); - maxScore=Math.max(maxScore, msta[i].getMaxScore()); - } else { - // if one search produced an IOException, rethrow it - throw ioe; - } - } - + final int totalHits = totalHits(searchThreads); ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()]; for (int i = hq.size() - 1; i >= 0; i--) // put docs in array scoreDocs[i] = hq.pop(); - return new TopFieldDocs(totalHits, scoreDocs, hq.getFields(), maxScore); + return new TopFieldDocs(totalHits, scoreDocs, hq.getFields(), Float.NEGATIVE_INFINITY); } + + /** Lower-level search API. * *

{@link Collector#collect(int)} is called for every matching document. @@ -197,41 +173,90 @@ public Query rewrite(Query original) throws IOException { return super.rewrite(original); } + + private int totalHits(ArrayList> searchThreads) throws IOException { + final CountTotalHits func = new CountTotalHits(); + map(func, searchThreads); + return func.totalHits; + } + + /* + * apply the function to each element of the list. This method encapsulates the logic how + * to wait for concurrently executed searchables. + */ + private void map(Function func, ArrayList> list) throws IOException{ + for (Future future : list) { + try{ + func.apply(future.get()); + } catch (ExecutionException e) { + if (e.getCause() instanceof IOException) + throw (IOException) e.getCause(); + throw new RuntimeException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // In 3.0 we will change this to throw + // InterruptedException instead + throw new RuntimeException(e); + } + } + } + +} +// Both functions could be reduced to Int as other values of TopDocs +// are not needed. Using sep. functions is more self documenting. +/** + * A function with one argument + * @param the argument type + */ +interface Function { + abstract void apply(T t); } /** + * Counts the total number of hits for all {@link TopDocs} instances + * provided. + */ +class CountTotalHits implements Function{ + int totalHits = 0; + public void apply(TopDocs t) { + totalHits += t.totalHits; + } +} +/** + * Accumulates the document frequency for a term. + */ +class CountDocFreq implements Function{ + int docFreq = 0; + public void apply(Integer t) { + docFreq += t.intValue(); + } +} + +/** * A thread subclass for searching a single searchable */ -class MultiSearcherThread extends Thread { +class MultiSearcherThread implements Callable { - private Searchable searchable; - private Weight weight; - private Filter filter; + private final Searchable searchable; + private final Weight weight; + private final Filter filter; private int nDocs; private TopDocs docs; - private int i; - private PriorityQueue hq; - private int[] starts; - private IOException ioe; - private Sort sort; + private final int i; + private final PriorityQueue hq; + private final int[] starts; + private final Sort sort; public MultiSearcherThread(Searchable searchable, Weight weight, Filter filter, - int nDocs, HitQueue hq, int i, int[] starts, String name) { - super(name); - this.searchable = searchable; - this.weight = weight; - this.filter = filter; - this.nDocs = nDocs; - this.hq = hq; - this.i = i; - this.starts = starts; + int nDocs, PriorityQueue hq, int i, int[] starts) { + this(searchable, weight, filter, nDocs, hq, null, i, starts); + } public MultiSearcherThread(Searchable searchable, Weight weight, - Filter filter, int nDocs, FieldDocSortedHitQueue hq, Sort sort, int i, - int[] starts, String name) { - super(name); + Filter filter, int nDocs, PriorityQueue hq, Sort sort, int i, + int[] starts) { this.searchable = searchable; this.weight = weight; this.filter = filter; @@ -242,63 +267,42 @@ this.sort = sort; } - @Override @SuppressWarnings ("unchecked") - public void run() { - try { - docs = (sort == null) ? searchable.search (weight, filter, nDocs) - : searchable.search (weight, filter, nDocs, sort); - } - // Store the IOException for later use by the caller of this thread - catch (IOException ioe) { - this.ioe = ioe; - } - if (ioe == null) { - if (sort != null) { - TopFieldDocs docsFields = (TopFieldDocs) docs; - // If one of the Sort fields is FIELD_DOC, need to fix its values, so that - // it will break ties by doc Id properly. Otherwise, it will compare to - // 'relative' doc Ids, that belong to two different searchables. - for (int j = 0; j < docsFields.fields.length; j++) { - if (docsFields.fields[j].getType() == SortField.DOC) { - // iterate over the score docs and change their fields value - for (int j2 = 0; j2 < docs.scoreDocs.length; j2++) { - FieldDoc fd = (FieldDoc) docs.scoreDocs[j2]; - fd.fields[j] = Integer.valueOf(((Integer) fd.fields[j]).intValue() + starts[i]); - } - break; + public TopDocs call() throws Exception { + docs = (sort == null) ? searchable.search (weight, filter, nDocs) + : searchable.search (weight, filter, nDocs, sort); + if (sort != null) { + TopFieldDocs docsFields = (TopFieldDocs) docs; + // If one of the Sort fields is FIELD_DOC, need to fix its values, so that + // it will break ties by doc Id properly. Otherwise, it will compare to + // 'relative' doc Ids, that belong to two different searchables. + for (int j = 0; j < docsFields.fields.length; j++) { + if (docsFields.fields[j].getType() == SortField.DOC) { + // iterate over the score docs and change their fields value + for (int j2 = 0; j2 < docs.scoreDocs.length; j2++) { + FieldDoc fd = (FieldDoc) docs.scoreDocs[j2]; + fd.fields[j] = Integer.valueOf(((Integer) fd.fields[j]).intValue() + starts[i]); } + break; } + } - ((FieldDocSortedHitQueue) hq).setFields(docsFields.fields); - } - ScoreDoc[] scoreDocs = docs.scoreDocs; - for (int j = 0; - j < scoreDocs.length; - j++) { // merge scoreDocs into hq - ScoreDoc scoreDoc = scoreDocs[j]; - scoreDoc.doc += starts[i]; // convert doc - //it would be so nice if we had a thread-safe insert - synchronized (hq) { - // this cast is bad, because we assume that the list has correct type. - // Because of that we have the @SuppressWarnings :-( - if (scoreDoc == ((PriorityQueue) hq).insertWithOverflow(scoreDoc)) - break; - } // no more scores > minScore - } + ((FieldDocSortedHitQueue) hq).setFields(docsFields.fields); } + ScoreDoc[] scoreDocs = docs.scoreDocs; + for (int j = 0; + j < scoreDocs.length; + j++) { // merge scoreDocs into hq + ScoreDoc scoreDoc = scoreDocs[j]; + scoreDoc.doc += starts[i]; // convert doc + //it would be so nice if we had a thread-safe insert + synchronized (hq) { + // this cast is bad, because we assume that the list has correct type. + // Because of that we have the @SuppressWarnings :-( + if (scoreDoc == ((PriorityQueue) hq).insertWithOverflow(scoreDoc)) + break; + } // no more scores > minScore + } + return docs; } - - public int hits() { - return docs.totalHits; - } - - public float getMaxScore() { - return docs.getMaxScore(); - } - - public IOException getIOException() { - return ioe; - } - } Index: src/java/org/apache/lucene/util/NamedThreadFactory.java =================================================================== --- src/java/org/apache/lucene/util/NamedThreadFactory.java (revision 0) +++ src/java/org/apache/lucene/util/NamedThreadFactory.java (revision 0) @@ -0,0 +1,53 @@ +/** + * + */ +package org.apache.lucene.util; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A default {@link ThreadFactory} implementation that accepts the name prefix + * of the created threads as a constructor argument. Otherwise, this factory + * yields the same semantics as the thread factory returned by + * {@link Executors#defaultThreadFactory()}. + */ +public class NamedThreadFactory implements ThreadFactory { + private static final AtomicInteger threadPoolNumber = new AtomicInteger(1); + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private static final String NAME_PATTERN = "%s-%d-thread"; + private final String threadNamePrefix; + + /** + * Creates a new {@link NamedThreadFactory} instance + * + * @param threadNamePrefix the name prefix assigned to each thread created. + */ + public NamedThreadFactory(String threadNamePrefix) { + final SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : Thread.currentThread() + .getThreadGroup(); + this.threadNamePrefix = String.format(NAME_PATTERN, + checkPrefix(threadNamePrefix), threadPoolNumber.getAndIncrement()); + } + + private static String checkPrefix(String prefix) { + return prefix == null || prefix.length() == 0 ? "Lucene" : prefix; + } + + /** + * Creates a new {@link Thread} + * + * @see java.util.concurrent.ThreadFactory#newThread(java.lang.Runnable) + */ + public Thread newThread(Runnable r) { + final Thread t = new Thread(group, r, String.format("%s-%d", + this.threadNamePrefix, threadNumber.getAndIncrement()), 0); + t.setDaemon(false); + t.setPriority(Thread.NORM_PRIORITY); + return t; + } + +} Property changes on: src\java\org\apache\lucene\util\NamedThreadFactory.java ___________________________________________________________________ Added: svn:keywords + Date Author Id Revision HeadURL Added: svn:eol-style + native Index: src/test/org/apache/lucene/search/TestMultiSearcher.java =================================================================== --- src/test/org/apache/lucene/search/TestMultiSearcher.java (revision 834055) +++ src/test/org/apache/lucene/search/TestMultiSearcher.java (working copy) @@ -398,4 +398,17 @@ // if the same similarity is used. assertEquals("MultiSearcher score must be equal to single esrcher score!", score1, scoreN, 1e-6); } + + public void testDocFreq() throws IOException{ + RAMDirectory dir1 = new RAMDirectory(); + RAMDirectory dir2 = new RAMDirectory(); + + initIndex(dir1, 10, true, "x"); // documents with two tokens "doc0" and "x", "doc1" and x, etc... + initIndex(dir2, 5, true, "x"); // documents with two tokens "doc0" and "x", "doc1" and x, etc... + IndexSearcher searcher1 = new IndexSearcher(dir1, true); + IndexSearcher searcher2 = new IndexSearcher(dir2, true); + + MultiSearcher multiSearcher = getMultiSearcherInstance(new Searcher[]{searcher1, searcher2}); + assertEquals(15, multiSearcher.docFreq(new Term("contents","x"))); + } }