>(searchables.length);
for (int i = 0; i < searchables.length; i++) { // search each searchable
- // Assume not too many searchables and cost of creating a thread is by far inferior to a search
- msta[i] = new MultiSearcherThread(searchables[i], weight, filter, nDocs,
- hq, sort, i, starts, "MultiSearcher thread #" + (i + 1));
- msta[i].start();
+ searchThreads.add(executor.submit(
+ new MultiSearcherThread(searchables[i], weight, filter, nDocs, hq, sort, i, starts)));
}
-
- float maxScore=Float.NEGATIVE_INFINITY;
-
- for (int i = 0; i < searchables.length; i++) {
- try {
- msta[i].join();
- } catch (InterruptedException ie) {
- // In 3.0 we will change this to throw
- // InterruptedException instead
- Thread.currentThread().interrupt();
- throw new RuntimeException(ie);
- }
- IOException ioe = msta[i].getIOException();
- if (ioe == null) {
- totalHits += msta[i].hits();
- maxScore=Math.max(maxScore, msta[i].getMaxScore());
- } else {
- // if one search produced an IOException, rethrow it
- throw ioe;
- }
- }
-
+ final int totalHits = totalHits(searchThreads);
ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
for (int i = hq.size() - 1; i >= 0; i--) // put docs in array
scoreDocs[i] = hq.pop();
- return new TopFieldDocs(totalHits, scoreDocs, hq.getFields(), maxScore);
+ return new TopFieldDocs(totalHits, scoreDocs, hq.getFields(), Float.NEGATIVE_INFINITY);
}
+
+
/** Lower-level search API.
*
* {@link Collector#collect(int)} is called for every matching document.
@@ -197,41 +173,90 @@
public Query rewrite(Query original) throws IOException {
return super.rewrite(original);
}
+
+ private int totalHits(ArrayList> searchThreads) throws IOException {
+ final CountTotalHits func = new CountTotalHits();
+ map(func, searchThreads);
+ return func.totalHits;
+ }
+
+ /*
+ * apply the function to each element of the list. This method encapsulates the logic how
+ * to wait for concurrently executed searchables.
+ */
+ private void map(Function func, ArrayList> list) throws IOException{
+ for (Future future : list) {
+ try{
+ func.apply(future.get());
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException)
+ throw (IOException) e.getCause();
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ // In 3.0 we will change this to throw
+ // InterruptedException instead
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+}
+// Both functions could be reduced to Int as other values of TopDocs
+// are not needed. Using sep. functions is more self documenting.
+/**
+ * A function with one argument
+ * @param the argument type
+ */
+interface Function {
+ abstract void apply(T t);
}
/**
+ * Counts the total number of hits for all {@link TopDocs} instances
+ * provided.
+ */
+class CountTotalHits implements Function{
+ int totalHits = 0;
+ public void apply(TopDocs t) {
+ totalHits += t.totalHits;
+ }
+}
+/**
+ * Accumulates the document frequency for a term.
+ */
+class CountDocFreq implements Function{
+ int docFreq = 0;
+ public void apply(Integer t) {
+ docFreq += t.intValue();
+ }
+}
+
+/**
* A thread subclass for searching a single searchable
*/
-class MultiSearcherThread extends Thread {
+class MultiSearcherThread implements Callable {
- private Searchable searchable;
- private Weight weight;
- private Filter filter;
+ private final Searchable searchable;
+ private final Weight weight;
+ private final Filter filter;
private int nDocs;
private TopDocs docs;
- private int i;
- private PriorityQueue extends ScoreDoc> hq;
- private int[] starts;
- private IOException ioe;
- private Sort sort;
+ private final int i;
+ private final PriorityQueue extends ScoreDoc> hq;
+ private final int[] starts;
+ private final Sort sort;
public MultiSearcherThread(Searchable searchable, Weight weight, Filter filter,
- int nDocs, HitQueue hq, int i, int[] starts, String name) {
- super(name);
- this.searchable = searchable;
- this.weight = weight;
- this.filter = filter;
- this.nDocs = nDocs;
- this.hq = hq;
- this.i = i;
- this.starts = starts;
+ int nDocs, PriorityQueue extends ScoreDoc> hq, int i, int[] starts) {
+ this(searchable, weight, filter, nDocs, hq, null, i, starts);
+
}
public MultiSearcherThread(Searchable searchable, Weight weight,
- Filter filter, int nDocs, FieldDocSortedHitQueue hq, Sort sort, int i,
- int[] starts, String name) {
- super(name);
+ Filter filter, int nDocs, PriorityQueue extends ScoreDoc> hq, Sort sort, int i,
+ int[] starts) {
this.searchable = searchable;
this.weight = weight;
this.filter = filter;
@@ -242,63 +267,42 @@
this.sort = sort;
}
- @Override
@SuppressWarnings ("unchecked")
- public void run() {
- try {
- docs = (sort == null) ? searchable.search (weight, filter, nDocs)
- : searchable.search (weight, filter, nDocs, sort);
- }
- // Store the IOException for later use by the caller of this thread
- catch (IOException ioe) {
- this.ioe = ioe;
- }
- if (ioe == null) {
- if (sort != null) {
- TopFieldDocs docsFields = (TopFieldDocs) docs;
- // If one of the Sort fields is FIELD_DOC, need to fix its values, so that
- // it will break ties by doc Id properly. Otherwise, it will compare to
- // 'relative' doc Ids, that belong to two different searchables.
- for (int j = 0; j < docsFields.fields.length; j++) {
- if (docsFields.fields[j].getType() == SortField.DOC) {
- // iterate over the score docs and change their fields value
- for (int j2 = 0; j2 < docs.scoreDocs.length; j2++) {
- FieldDoc fd = (FieldDoc) docs.scoreDocs[j2];
- fd.fields[j] = Integer.valueOf(((Integer) fd.fields[j]).intValue() + starts[i]);
- }
- break;
+ public TopDocs call() throws Exception {
+ docs = (sort == null) ? searchable.search (weight, filter, nDocs)
+ : searchable.search (weight, filter, nDocs, sort);
+ if (sort != null) {
+ TopFieldDocs docsFields = (TopFieldDocs) docs;
+ // If one of the Sort fields is FIELD_DOC, need to fix its values, so that
+ // it will break ties by doc Id properly. Otherwise, it will compare to
+ // 'relative' doc Ids, that belong to two different searchables.
+ for (int j = 0; j < docsFields.fields.length; j++) {
+ if (docsFields.fields[j].getType() == SortField.DOC) {
+ // iterate over the score docs and change their fields value
+ for (int j2 = 0; j2 < docs.scoreDocs.length; j2++) {
+ FieldDoc fd = (FieldDoc) docs.scoreDocs[j2];
+ fd.fields[j] = Integer.valueOf(((Integer) fd.fields[j]).intValue() + starts[i]);
}
+ break;
}
+ }
- ((FieldDocSortedHitQueue) hq).setFields(docsFields.fields);
- }
- ScoreDoc[] scoreDocs = docs.scoreDocs;
- for (int j = 0;
- j < scoreDocs.length;
- j++) { // merge scoreDocs into hq
- ScoreDoc scoreDoc = scoreDocs[j];
- scoreDoc.doc += starts[i]; // convert doc
- //it would be so nice if we had a thread-safe insert
- synchronized (hq) {
- // this cast is bad, because we assume that the list has correct type.
- // Because of that we have the @SuppressWarnings :-(
- if (scoreDoc == ((PriorityQueue) hq).insertWithOverflow(scoreDoc))
- break;
- } // no more scores > minScore
- }
+ ((FieldDocSortedHitQueue) hq).setFields(docsFields.fields);
}
+ ScoreDoc[] scoreDocs = docs.scoreDocs;
+ for (int j = 0;
+ j < scoreDocs.length;
+ j++) { // merge scoreDocs into hq
+ ScoreDoc scoreDoc = scoreDocs[j];
+ scoreDoc.doc += starts[i]; // convert doc
+ //it would be so nice if we had a thread-safe insert
+ synchronized (hq) {
+ // this cast is bad, because we assume that the list has correct type.
+ // Because of that we have the @SuppressWarnings :-(
+ if (scoreDoc == ((PriorityQueue) hq).insertWithOverflow(scoreDoc))
+ break;
+ } // no more scores > minScore
+ }
+ return docs;
}
-
- public int hits() {
- return docs.totalHits;
- }
-
- public float getMaxScore() {
- return docs.getMaxScore();
- }
-
- public IOException getIOException() {
- return ioe;
- }
-
}
Index: src/java/org/apache/lucene/util/NamedThreadFactory.java
===================================================================
--- src/java/org/apache/lucene/util/NamedThreadFactory.java (revision 0)
+++ src/java/org/apache/lucene/util/NamedThreadFactory.java (revision 0)
@@ -0,0 +1,53 @@
+/**
+ *
+ */
+package org.apache.lucene.util;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A default {@link ThreadFactory} implementation that accepts the name prefix
+ * of the created threads as a constructor argument. Otherwise, this factory
+ * yields the same semantics as the thread factory returned by
+ * {@link Executors#defaultThreadFactory()}.
+ */
+public class NamedThreadFactory implements ThreadFactory {
+ private static final AtomicInteger threadPoolNumber = new AtomicInteger(1);
+ private final ThreadGroup group;
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+ private static final String NAME_PATTERN = "%s-%d-thread";
+ private final String threadNamePrefix;
+
+ /**
+ * Creates a new {@link NamedThreadFactory} instance
+ *
+ * @param threadNamePrefix the name prefix assigned to each thread created.
+ */
+ public NamedThreadFactory(String threadNamePrefix) {
+ final SecurityManager s = System.getSecurityManager();
+ group = (s != null) ? s.getThreadGroup() : Thread.currentThread()
+ .getThreadGroup();
+ this.threadNamePrefix = String.format(NAME_PATTERN,
+ checkPrefix(threadNamePrefix), threadPoolNumber.getAndIncrement());
+ }
+
+ private static String checkPrefix(String prefix) {
+ return prefix == null || prefix.length() == 0 ? "Lucene" : prefix;
+ }
+
+ /**
+ * Creates a new {@link Thread}
+ *
+ * @see java.util.concurrent.ThreadFactory#newThread(java.lang.Runnable)
+ */
+ public Thread newThread(Runnable r) {
+ final Thread t = new Thread(group, r, String.format("%s-%d",
+ this.threadNamePrefix, threadNumber.getAndIncrement()), 0);
+ t.setDaemon(false);
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
+
+}
Property changes on: src\java\org\apache\lucene\util\NamedThreadFactory.java
___________________________________________________________________
Added: svn:keywords
+ Date Author Id Revision HeadURL
Added: svn:eol-style
+ native
Index: src/test/org/apache/lucene/search/TestMultiSearcher.java
===================================================================
--- src/test/org/apache/lucene/search/TestMultiSearcher.java (revision 834055)
+++ src/test/org/apache/lucene/search/TestMultiSearcher.java (working copy)
@@ -398,4 +398,17 @@
// if the same similarity is used.
assertEquals("MultiSearcher score must be equal to single esrcher score!", score1, scoreN, 1e-6);
}
+
+ public void testDocFreq() throws IOException{
+ RAMDirectory dir1 = new RAMDirectory();
+ RAMDirectory dir2 = new RAMDirectory();
+
+ initIndex(dir1, 10, true, "x"); // documents with two tokens "doc0" and "x", "doc1" and x, etc...
+ initIndex(dir2, 5, true, "x"); // documents with two tokens "doc0" and "x", "doc1" and x, etc...
+ IndexSearcher searcher1 = new IndexSearcher(dir1, true);
+ IndexSearcher searcher2 = new IndexSearcher(dir2, true);
+
+ MultiSearcher multiSearcher = getMultiSearcherInstance(new Searcher[]{searcher1, searcher2});
+ assertEquals(15, multiSearcher.docFreq(new Term("contents","x")));
+ }
}