Index: src/java/org/apache/lucene/search/ParallelMultiSearcher.java =================================================================== --- src/java/org/apache/lucene/search/ParallelMultiSearcher.java (revision 952449) +++ src/java/org/apache/lucene/search/ParallelMultiSearcher.java (working copy) @@ -21,10 +21,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -67,17 +68,17 @@ */ @Override public int docFreq(final Term term) throws IOException { - @SuppressWarnings("unchecked") final Future[] searchThreads = new Future[searchables.length]; - for (int i = 0; i < searchables.length; i++) { // search each searchable + final CompletionService completionService = new ExecutorCompletionService(this.executor); + for(int i = 0; i < searchables.length; i++) { final Searchable searchable = searchables[i]; - searchThreads[i] = executor.submit(new Callable() { + completionService.submit(new Callable() { public Integer call() throws IOException { return Integer.valueOf(searchable.docFreq(term)); } }); } final CountDocFreq func = new CountDocFreq(); - foreach(func, Arrays.asList(searchThreads)); + foreach(func, completionService, searchables.length); return func.docFreq; } @@ -90,14 +91,15 @@ public TopDocs search(Weight weight, Filter filter, int nDocs) throws IOException { final HitQueue hq = new HitQueue(nDocs, false); final Lock lock = new ReentrantLock(); - @SuppressWarnings("unchecked") final Future[] searchThreads = new Future[searchables.length]; + final CompletionService completionService = new ExecutorCompletionService(this.executor); + for (int i = 0; i < searchables.length; i++) { // search each searchable - searchThreads[i] = executor.submit( + completionService.submit( new MultiSearcherCallableNoSort(lock, searchables[i], weight, filter, nDocs, hq, i, starts)); } final CountTotalHits func = new CountTotalHits(); - foreach(func, Arrays.asList(searchThreads)); + foreach(func, completionService, searchables.length); final ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()]; for (int i = hq.size() - 1; i >= 0; i--) // put docs in array @@ -117,14 +119,14 @@ final FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue(nDocs); final Lock lock = new ReentrantLock(); - @SuppressWarnings("unchecked") final Future[] searchThreads = new Future[searchables.length]; + final CompletionService completionService = new ExecutorCompletionService(this.executor); for (int i = 0; i < searchables.length; i++) { // search each searchable - searchThreads[i] = executor.submit( + completionService.submit( new MultiSearcherCallableWithSort(lock, searchables[i], weight, filter, nDocs, hq, sort, i, starts)); } final CountTotalHits func = new CountTotalHits(); - foreach(func, Arrays.asList(searchThreads)); + foreach(func, completionService, searchables.length); final ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()]; for (int i = hq.size() - 1; i >= 0; i--) // put docs in array @@ -192,13 +194,12 @@ HashMap createDocFrequencyMap(Set terms) throws IOException { final Term[] allTermsArray = terms.toArray(new Term[terms.size()]); final int[] aggregatedDocFreqs = new int[terms.size()]; - final ArrayList> searchThreads = new ArrayList>(searchables.length); + final CompletionService completionService = new ExecutorCompletionService(this.executor); for (Searchable searchable : searchables) { - final Future future = executor.submit( + completionService.submit( new DocumentFrequencyCallable(searchable, allTermsArray)); - searchThreads.add(future); } - foreach(new AggregateDocFrequency(aggregatedDocFreqs), searchThreads); + foreach(new AggregateDocFrequency(aggregatedDocFreqs), completionService, searchables.length); final HashMap dfMap = new HashMap(); for(int i=0; i void foreach(Function func, List> list) throws IOException{ - for (Future future : list) { + private void foreach(Function func, CompletionService service, int numTasks) throws IOException { + for(int i = 0; i < numTasks; i++) { try{ - func.apply(future.get()); + func.apply(service.take().get()); } catch (ExecutionException e) { final Throwable throwable = e.getCause(); if (throwable instanceof IOException) @@ -225,7 +226,7 @@ } } } - + // Both functions could be reduced to Int as other values of TopDocs // are not needed. Using sep. functions is more self documenting. /**