Index: src/java/org/apache/lucene/search/ParallelMultiSearcher.java
===================================================================
--- src/java/org/apache/lucene/search/ParallelMultiSearcher.java (revision 834250)
+++ src/java/org/apache/lucene/search/ParallelMultiSearcher.java (working copy)
@@ -18,9 +18,16 @@
*/
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
-import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.util.NamedThreadFactory;
import org.apache.lucene.util.PriorityQueue;
/** Implements parallel search over a set of Searchables.
@@ -29,69 +36,60 @@
* or {@link #search(Query,Filter)} methods.
*/
public class ParallelMultiSearcher extends MultiSearcher {
+
+ private final ExecutorService executor;
+ private final Searchable[] searchables;
+ private final int[] starts;
- private Searchable[] searchables;
- private int[] starts;
-
- /** Creates a searchable which searches searchables. */
+ /** Creates a {@link Searchable} which searches searchables. */
public ParallelMultiSearcher(Searchable... searchables) throws IOException {
super(searchables);
this.searchables = searchables;
this.starts = getStarts();
+ executor = Executors.newCachedThreadPool(new NamedThreadFactory(this.getClass().getSimpleName()));
}
/**
- * TODO: parallelize this one too
+ * Executes each {@link Searchable}'s docFreq() in its own thread and waits for each search to complete and merge
+ * the results back together.
*/
@Override
- public int docFreq(Term term) throws IOException {
- return super.docFreq(term);
+ public int docFreq(final Term term) throws IOException {
+ final ArrayList> searchThreads = new ArrayList>(searchables.length);
+ for (final Searchable searchable : searchables) {
+ searchThreads.add(executor.submit(new Callable(){
+ public Integer call() throws Exception {
+ return Integer.valueOf(searchable.docFreq(term));
+ }
+ }));
+ }
+ final CountDocFreq func = new CountDocFreq();
+ map(func, searchThreads);
+ return func.docFreq;
}
/**
- * A search implementation which spans a new thread for each
- * Searchable, waits for each search to complete and merge
+ * A search implementation which executes each
+ * {@link Searchable} in its own thread and waits for each search to complete and merge
* the results back together.
*/
@Override
- public TopDocs search(Weight weight, Filter filter, int nDocs)
- throws IOException {
- HitQueue hq = new HitQueue(nDocs, false);
- int totalHits = 0;
- MultiSearcherThread[] msta =
- new MultiSearcherThread[searchables.length];
+ public TopDocs search(Weight weight, Filter filter, int nDocs) throws IOException {
+ final HitQueue hq = new HitQueue(nDocs, false);
+ final ArrayList> searchThreads = new ArrayList>(searchables.length);
for (int i = 0; i < searchables.length; i++) { // search each searchable
- // Assume not too many searchables and cost of creating a thread is by far inferior to a search
- msta[i] = new MultiSearcherThread(searchables[i], weight, filter, nDocs,
- hq, i, starts, "MultiSearcher thread #" + (i + 1));
- msta[i].start();
+ searchThreads.add(executor.submit(
+ new MultiSearcherThreadNoSort(searchables[i], weight, filter, nDocs, hq, i, starts)));
}
- for (int i = 0; i < searchables.length; i++) {
- try {
- msta[i].join();
- } catch (InterruptedException ie) {
- // In 3.0 we will change this to throw
- // InterruptedException instead
- Thread.currentThread().interrupt();
- throw new RuntimeException(ie);
- }
- IOException ioe = msta[i].getIOException();
- if (ioe == null) {
- totalHits += msta[i].hits();
- } else {
- // if one search produced an IOException, rethrow it
- throw ioe;
- }
- }
+ final CountTotalHits func = new CountTotalHits();
+ map(func, searchThreads);
- ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
+ final ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
for (int i = hq.size() - 1; i >= 0; i--) // put docs in array
scoreDocs[i] = hq.pop();
- float maxScore = (totalHits==0) ? Float.NEGATIVE_INFINITY : scoreDocs[0].score;
-
- return new TopDocs(totalHits, scoreDocs, maxScore);
+ return new TopDocs(func.totalHits, scoreDocs, func.maxScore);
}
/**
@@ -100,45 +98,25 @@
* the results back together.
*/
@Override
- public TopFieldDocs search(Weight weight, Filter filter, int nDocs, Sort sort)
- throws IOException {
+ public TopFieldDocs search(Weight weight, Filter filter, int nDocs, Sort sort) throws IOException {
+ if (sort == null) throw new NullPointerException();
+
// don't specify the fields - we'll wait to do this until we get results
- FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue (null, nDocs);
- int totalHits = 0;
- MultiSearcherThread[] msta = new MultiSearcherThread[searchables.length];
+ final FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue (null, nDocs);
+ final ArrayList> searchThreads = new ArrayList>(searchables.length);
for (int i = 0; i < searchables.length; i++) { // search each searchable
- // Assume not too many searchables and cost of creating a thread is by far inferior to a search
- msta[i] = new MultiSearcherThread(searchables[i], weight, filter, nDocs,
- hq, sort, i, starts, "MultiSearcher thread #" + (i + 1));
- msta[i].start();
+ searchThreads.add(executor.submit(
+ new MultiSearcherThreadWithSort(searchables[i], weight, filter, nDocs, hq, sort, i, starts)));
}
- float maxScore=Float.NEGATIVE_INFINITY;
-
- for (int i = 0; i < searchables.length; i++) {
- try {
- msta[i].join();
- } catch (InterruptedException ie) {
- // In 3.0 we will change this to throw
- // InterruptedException instead
- Thread.currentThread().interrupt();
- throw new RuntimeException(ie);
- }
- IOException ioe = msta[i].getIOException();
- if (ioe == null) {
- totalHits += msta[i].hits();
- maxScore=Math.max(maxScore, msta[i].getMaxScore());
- } else {
- // if one search produced an IOException, rethrow it
- throw ioe;
- }
- }
+ final CountTotalHits func = new CountTotalHits();
+ map(func, searchThreads);
ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
for (int i = hq.size() - 1; i >= 0; i--) // put docs in array
scoreDocs[i] = hq.pop();
- return new TopFieldDocs(totalHits, scoreDocs, hq.getFields(), maxScore);
+ return new TopFieldDocs(func.totalHits, scoreDocs, hq.getFields(), func.maxScore);
}
/** Lower-level search API.
@@ -188,117 +166,160 @@
searchables[i].search(weight, filter, hc);
}
}
-
+
/*
- * TODO: this one could be parallelized too
- * @see org.apache.lucene.search.Searchable#rewrite(org.apache.lucene.search.Query)
+ * apply the function to each element of the list. This method encapsulates the logic how
+ * to wait for concurrently executed searchables.
*/
- @Override
- public Query rewrite(Query original) throws IOException {
- return super.rewrite(original);
+ private void map(Function func, ArrayList> list) throws IOException{
+ for (Future future : list) {
+ try{
+ func.apply(future.get());
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException)
+ throw (IOException) e.getCause();
+ throw new RuntimeException(e.getCause());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ // In 3.0 we will change this to throw
+ // InterruptedException instead
+ throw new RuntimeException(e);
+ }
+ }
}
-}
+ // Both functions could be reduced to Int as other values of TopDocs
+ // are not needed. Using sep. functions is more self documenting.
+ /**
+ * A function with one argument
+ * @param the argument type
+ */
+ private static interface Function {
+ abstract void apply(T t);
+ }
-/**
- * A thread subclass for searching a single searchable
- */
-class MultiSearcherThread extends Thread {
+ /**
+ * Counts the total number of hits for all {@link TopDocs} instances
+ * provided.
+ */
+ private static class CountTotalHits implements Function {
+ int totalHits = 0;
+ float maxScore = Float.NEGATIVE_INFINITY;
+
+ public void apply(T t) {
+ totalHits += t.totalHits;
+ maxScore = Math.max(maxScore, t.getMaxScore());
+ }
+ }
+ /**
+ * Accumulates the document frequency for a term.
+ */
+ private static class CountDocFreq implements Function{
+ int docFreq = 0;
+
+ public void apply(Integer t) {
+ docFreq += t.intValue();
+ }
+ }
- private Searchable searchable;
- private Weight weight;
- private Filter filter;
- private int nDocs;
- private TopDocs docs;
- private int i;
- private PriorityQueue extends ScoreDoc> hq;
- private int[] starts;
- private IOException ioe;
- private Sort sort;
+ /**
+ * A thread subclass for searching a single searchable
+ */
+ private static class MultiSearcherThreadNoSort implements Callable {
- public MultiSearcherThread(Searchable searchable, Weight weight, Filter filter,
- int nDocs, HitQueue hq, int i, int[] starts, String name) {
- super(name);
- this.searchable = searchable;
- this.weight = weight;
- this.filter = filter;
- this.nDocs = nDocs;
- this.hq = hq;
- this.i = i;
- this.starts = starts;
- }
+ private final Searchable searchable;
+ private final Weight weight;
+ private final Filter filter;
+ private final int nDocs;
+ private final int i;
+ private final HitQueue hq;
+ private final int[] starts;
- public MultiSearcherThread(Searchable searchable, Weight weight,
- Filter filter, int nDocs, FieldDocSortedHitQueue hq, Sort sort, int i,
- int[] starts, String name) {
- super(name);
- this.searchable = searchable;
- this.weight = weight;
- this.filter = filter;
- this.nDocs = nDocs;
- this.hq = hq;
- this.i = i;
- this.starts = starts;
- this.sort = sort;
+ public MultiSearcherThreadNoSort(Searchable searchable, Weight weight,
+ Filter filter, int nDocs, HitQueue hq, int i, int[] starts) {
+ this.searchable = searchable;
+ this.weight = weight;
+ this.filter = filter;
+ this.nDocs = nDocs;
+ this.hq = hq;
+ this.i = i;
+ this.starts = starts;
+ }
+
+ public TopDocs call() throws Exception {
+ final TopDocs docs = searchable.search (weight, filter, nDocs);
+ final ScoreDoc[] scoreDocs = docs.scoreDocs;
+ for (int j = 0; j < scoreDocs.length; j++) { // merge scoreDocs into hq
+ final ScoreDoc scoreDoc = scoreDocs[j];
+ scoreDoc.doc += starts[i]; // convert doc
+ //it would be so nice if we had a thread-safe insert
+ synchronized (hq) {
+ if (scoreDoc == hq.insertWithOverflow(scoreDoc))
+ break;
+ } // no more scores > minScore
+ }
+ return docs;
+ }
}
- @Override
- @SuppressWarnings ("unchecked")
- public void run() {
- try {
- docs = (sort == null) ? searchable.search (weight, filter, nDocs)
- : searchable.search (weight, filter, nDocs, sort);
+ /**
+ * A thread subclass for searching a single searchable
+ */
+ private static class MultiSearcherThreadWithSort implements Callable {
+
+ private final Searchable searchable;
+ private final Weight weight;
+ private final Filter filter;
+ private final int nDocs;
+ private final int i;
+ private final FieldDocSortedHitQueue hq;
+ private final int[] starts;
+ private final Sort sort;
+
+ public MultiSearcherThreadWithSort(Searchable searchable, Weight weight,
+ Filter filter, int nDocs, FieldDocSortedHitQueue hq, Sort sort, int i, int[] starts) {
+ this.searchable = searchable;
+ this.weight = weight;
+ this.filter = filter;
+ this.nDocs = nDocs;
+ this.hq = hq;
+ this.i = i;
+ this.starts = starts;
+ this.sort = sort;
}
- // Store the IOException for later use by the caller of this thread
- catch (IOException ioe) {
- this.ioe = ioe;
- }
- if (ioe == null) {
- if (sort != null) {
- TopFieldDocs docsFields = (TopFieldDocs) docs;
- // If one of the Sort fields is FIELD_DOC, need to fix its values, so that
- // it will break ties by doc Id properly. Otherwise, it will compare to
- // 'relative' doc Ids, that belong to two different searchables.
- for (int j = 0; j < docsFields.fields.length; j++) {
- if (docsFields.fields[j].getType() == SortField.DOC) {
- // iterate over the score docs and change their fields value
- for (int j2 = 0; j2 < docs.scoreDocs.length; j2++) {
- FieldDoc fd = (FieldDoc) docs.scoreDocs[j2];
- fd.fields[j] = Integer.valueOf(((Integer) fd.fields[j]).intValue() + starts[i]);
- }
- break;
+
+ public TopFieldDocs call() throws Exception {
+ final TopFieldDocs docs = searchable.search (weight, filter, nDocs, sort);
+ // If one of the Sort fields is FIELD_DOC, need to fix its values, so that
+ // it will break ties by doc Id properly. Otherwise, it will compare to
+ // 'relative' doc Ids, that belong to two different searchables.
+ for (int j = 0; j < docs.fields.length; j++) {
+ if (docs.fields[j].getType() == SortField.DOC) {
+ // iterate over the score docs and change their fields value
+ for (int j2 = 0; j2 < docs.scoreDocs.length; j2++) {
+ FieldDoc fd = (FieldDoc) docs.scoreDocs[j2];
+ fd.fields[j] = Integer.valueOf(((Integer) fd.fields[j]).intValue() + starts[i]);
}
+ break;
}
+ }
- ((FieldDocSortedHitQueue) hq).setFields(docsFields.fields);
+ synchronized (hq) {
+ hq.setFields(docs.fields);
}
- ScoreDoc[] scoreDocs = docs.scoreDocs;
- for (int j = 0;
- j < scoreDocs.length;
- j++) { // merge scoreDocs into hq
- ScoreDoc scoreDoc = scoreDocs[j];
- scoreDoc.doc += starts[i]; // convert doc
+
+ final ScoreDoc[] scoreDocs = docs.scoreDocs;
+ for (int j = 0; j < scoreDocs.length; j++) { // merge scoreDocs into hq
+ final FieldDoc fieldDoc = (FieldDoc) scoreDocs[j];
+ fieldDoc.doc += starts[i]; // convert doc
//it would be so nice if we had a thread-safe insert
synchronized (hq) {
- // this cast is bad, because we assume that the list has correct type.
- // Because of that we have the @SuppressWarnings :-(
- if (scoreDoc == ((PriorityQueue) hq).insertWithOverflow(scoreDoc))
+ if (fieldDoc == hq.insertWithOverflow(fieldDoc))
break;
} // no more scores > minScore
}
+ return docs;
}
}
-
- public int hits() {
- return docs.totalHits;
- }
-
- public float getMaxScore() {
- return docs.getMaxScore();
- }
-
- public IOException getIOException() {
- return ioe;
- }
-
+
}
Index: src/java/org/apache/lucene/util/NamedThreadFactory.java
===================================================================
--- src/java/org/apache/lucene/util/NamedThreadFactory.java (revision 0)
+++ src/java/org/apache/lucene/util/NamedThreadFactory.java (revision 0)
@@ -0,0 +1,53 @@
+/**
+ *
+ */
+package org.apache.lucene.util;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A default {@link ThreadFactory} implementation that accepts the name prefix
+ * of the created threads as a constructor argument. Otherwise, this factory
+ * yields the same semantics as the thread factory returned by
+ * {@link Executors#defaultThreadFactory()}.
+ */
+public class NamedThreadFactory implements ThreadFactory {
+ private static final AtomicInteger threadPoolNumber = new AtomicInteger(1);
+ private final ThreadGroup group;
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+ private static final String NAME_PATTERN = "%s-%d-thread";
+ private final String threadNamePrefix;
+
+ /**
+ * Creates a new {@link NamedThreadFactory} instance
+ *
+ * @param threadNamePrefix the name prefix assigned to each thread created.
+ */
+ public NamedThreadFactory(String threadNamePrefix) {
+ final SecurityManager s = System.getSecurityManager();
+ group = (s != null) ? s.getThreadGroup() : Thread.currentThread()
+ .getThreadGroup();
+ this.threadNamePrefix = String.format(NAME_PATTERN,
+ checkPrefix(threadNamePrefix), threadPoolNumber.getAndIncrement());
+ }
+
+ private static String checkPrefix(String prefix) {
+ return prefix == null || prefix.length() == 0 ? "Lucene" : prefix;
+ }
+
+ /**
+ * Creates a new {@link Thread}
+ *
+ * @see java.util.concurrent.ThreadFactory#newThread(java.lang.Runnable)
+ */
+ public Thread newThread(Runnable r) {
+ final Thread t = new Thread(group, r, String.format("%s-%d",
+ this.threadNamePrefix, threadNumber.getAndIncrement()), 0);
+ t.setDaemon(false);
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
+
+}
Property changes on: src\java\org\apache\lucene\util\NamedThreadFactory.java
___________________________________________________________________
Added: svn:keywords
+ Date Author Id Revision HeadURL
Added: svn:eol-style
+ native
Index: src/test/org/apache/lucene/search/TestMultiSearcher.java
===================================================================
--- src/test/org/apache/lucene/search/TestMultiSearcher.java (revision 834250)
+++ src/test/org/apache/lucene/search/TestMultiSearcher.java (working copy)
@@ -396,6 +396,19 @@
// The scores from the IndexSearcher and Multisearcher should be the same
// if the same similarity is used.
- assertEquals("MultiSearcher score must be equal to single esrcher score!", score1, scoreN, 1e-6);
+ assertEquals("MultiSearcher score must be equal to single searcher score!", score1, scoreN, 1e-6);
}
+
+ public void testDocFreq() throws IOException{
+ RAMDirectory dir1 = new RAMDirectory();
+ RAMDirectory dir2 = new RAMDirectory();
+
+ initIndex(dir1, 10, true, "x"); // documents with two tokens "doc0" and "x", "doc1" and x, etc...
+ initIndex(dir2, 5, true, "x"); // documents with two tokens "doc0" and "x", "doc1" and x, etc...
+ IndexSearcher searcher1 = new IndexSearcher(dir1, true);
+ IndexSearcher searcher2 = new IndexSearcher(dir2, true);
+
+ MultiSearcher multiSearcher = getMultiSearcherInstance(new Searcher[]{searcher1, searcher2});
+ assertEquals(15, multiSearcher.docFreq(new Term("contents","x")));
+ }
}