Index: lucene/CHANGES.txt =================================================================== --- lucene/CHANGES.txt (revision 1214418) +++ lucene/CHANGES.txt (working copy) @@ -751,6 +751,12 @@ where they would create invalid offsets in some situations, leading to problems in highlighting. (Max Beutel via Robert Muir) +* LUCENE-3639: TopDocs.merge was incorrectly setting TopDocs.maxScore to + Float.MIN_VALUE when it should be Float.NaN, when there were 0 + hits. Improved age calculation in SearcherLifetimeManager, to have + double precision and to compute age to be how long ago the searcher + was replaced with a new searcher (Mike McCandless) + Documentation * LUCENE-3597: Fixed incorrect grouping documentation. (Martijn van Groningen, Robert Muir) Index: lucene/src/test/org/apache/lucene/search/TestShardSearching.java =================================================================== --- lucene/src/test/org/apache/lucene/search/TestShardSearching.java (revision 0) +++ lucene/src/test/org/apache/lucene/search/TestShardSearching.java (working copy) @@ -0,0 +1,393 @@ +package org.apache.lucene.search; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.MultiFields; +import org.apache.lucene.index.MultiReader; +import org.apache.lucene.index.Term; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.LuceneTestCase.UseNoMemoryExpensiveCodec; +import org.apache.lucene.util._TestUtil; + +// TODO +// - other queries besides PrefixQuery & TermQuery (but: +// FuzzyQ will be problematic... the top N terms it +// takes means results will differ) +// - NRQ/F +// - BQ, negated clauses, negated prefix clauses +// - test pulling docs in 2nd round trip... +// - filter too + +@UseNoMemoryExpensiveCodec +public class TestShardSearching extends ShardSearchingTestBase { + + private static class PreviousSearchState { + public final long searchTimeNanos; + public final long[] versions; + public final ScoreDoc searchAfterLocal; + public final ScoreDoc searchAfterShard; + public final Sort sort; + public final Query query; + public final int numHitsPaged; + + public PreviousSearchState(Query query, Sort sort, ScoreDoc searchAfterLocal, ScoreDoc searchAfterShard, long[] versions, int numHitsPaged) { + this.versions = versions.clone(); + this.searchAfterLocal = searchAfterLocal; + this.searchAfterShard = searchAfterShard; + this.sort = sort; + this.query = query; + this.numHitsPaged = numHitsPaged; + searchTimeNanos = System.nanoTime(); + } + } + + public void testSimple() throws Exception { + final int numNodes = _TestUtil.nextInt(random, 1, 10); + + final double runTimeSec = 5.0 * RANDOM_MULTIPLIER; + + final int minDocsToMakeTerms = _TestUtil.nextInt(random, 5, 20); + + final int maxSearcherAgeSeconds = _TestUtil.nextInt(random, 1, 4); + + if (VERBOSE) { + System.out.println("TEST: numNodes=" + numNodes + " runTimeSec=" + runTimeSec + " maxSearcherAgeSeconds=" + maxSearcherAgeSeconds); + } + + start(_TestUtil.getTempDir("TestShardSearching").toString(), + numNodes, + runTimeSec, + maxSearcherAgeSeconds + ); + + final List priorSearches = new ArrayList(); + List terms = null; + while (System.nanoTime() < endTimeNanos) { + + final boolean doFollowon = priorSearches.size() > 0 && random.nextInt(7) == 1; + + // Pick a random node; we will run the query on this node: + final int myNodeID = random.nextInt(numNodes); + + final NodeState.ShardIndexSearcher localShardSearcher; + + final PreviousSearchState prevSearchState; + + if (doFollowon) { + // Pretend user issued a followon query: + prevSearchState = priorSearches.get(random.nextInt(priorSearches.size())); + + if (VERBOSE) { + System.out.println("\nTEST: follow-on query age=" + ((System.nanoTime() - prevSearchState.searchTimeNanos)/1000000000.0)); + } + + try { + localShardSearcher = nodes[myNodeID].acquire(prevSearchState.versions); + } catch (SearcherExpiredException see) { + // Expected, sometimes; in a "real" app we would + // either forward this error to the user ("too + // much time has passed; please re-run your + // search") or sneakily just switch to newest + // searcher w/o telling them... + if (VERBOSE) { + System.out.println(" searcher expired during local shard searcher init: " + see); + } + priorSearches.remove(prevSearchState); + continue; + } + } else { + if (VERBOSE) { + System.out.println("\nTEST: fresh query"); + } + // Do fresh query: + localShardSearcher = nodes[myNodeID].acquire(); + prevSearchState = null; + } + + final IndexReader[] subs = new IndexReader[numNodes]; + + PreviousSearchState searchState = null; + + try { + + // Mock: now make a single reader (MultiReader) from all node + // searchers. In a real shard env you can't do this... we + // do it to confirm results from the shard searcher + // are correct: + int docCount = 0; + try { + for(int nodeID=0;nodeID= 0) { + subs[nodeID].decRef(); + subs[nodeID] = null; + nodeID--; + } + throw new SearcherExpiredException("nodeID=" + nodeID + " version=" + subVersion); + } + subs[nodeID] = sub.getIndexReader(); + docCount += subs[nodeID].maxDoc(); + } + } catch (SearcherExpiredException see) { + // Expected + if (VERBOSE) { + System.out.println(" searcher expired during mock reader init: " + see); + } + continue; + } + + final IndexReader mockReader = new MultiReader(subs); + final IndexSearcher mockSearcher = new IndexSearcher(mockReader); + + Query query; + Sort sort; + + if (prevSearchState != null) { + query = prevSearchState.query; + sort = prevSearchState.sort; + } else { + if (terms == null && docCount > minDocsToMakeTerms) { + // TODO: try to "focus" on high freq terms sometimes too + // TODO: maybe also periodically reset the terms...? + final TermsEnum termsEnum = MultiFields.getTerms(mockReader, "body").iterator(null); + terms = new ArrayList(); + while(termsEnum.next() != null) { + terms.add(BytesRef.deepCopyOf(termsEnum.term())); + } + if (VERBOSE) { + System.out.println("TEST: init terms: " + terms.size() + " terms"); + } + if (terms.size() == 0) { + terms = null; + } + } + + if (VERBOSE) { + System.out.println(" maxDoc=" + mockReader.maxDoc()); + } + + if (terms != null) { + if (random.nextBoolean()) { + query = new TermQuery(new Term("body", terms.get(random.nextInt(terms.size())))); + } else { + final String t = terms.get(random.nextInt(terms.size())).utf8ToString(); + final String prefix; + if (t.length() <= 1) { + prefix = t; + } else { + prefix = t.substring(0, _TestUtil.nextInt(random, 1, 2)); + } + query = new PrefixQuery(new Term("body", prefix)); + } + + if (random.nextBoolean()) { + sort = null; + } else { + // TODO: sort by more than 1 field + final int what = random.nextInt(3); + if (what == 0) { + sort = new Sort(SortField.FIELD_SCORE); + } else if (what == 1) { + // TODO: this sort doesn't merge + // correctly... it's tricky because you + // could have > 2.1B docs across all shards: + //sort = new Sort(SortField.FIELD_DOC); + sort = null; + } else if (what == 2) { + sort = new Sort(new SortField[] {new SortField("docid", SortField.Type.INT, random.nextBoolean())}); + } else { + sort = new Sort(new SortField[] {new SortField("title", SortField.Type.STRING, random.nextBoolean())}); + } + } + } else { + query = null; + sort = null; + } + } + + if (query != null) { + + try { + searchState = assertSame(mockSearcher, localShardSearcher, query, sort, prevSearchState); + } catch (SearcherExpiredException see) { + // Expected; in a "real" app we would + // either forward this error to the user ("too + // much time has passed; please re-run your + // search") or sneakily just switch to newest + // searcher w/o telling them... + if (VERBOSE) { + System.out.println(" searcher expired during search: " + see); + see.printStackTrace(System.out); + } + assert prevSearchState != null; + priorSearches.remove(prevSearchState); + } + } + } finally { + nodes[myNodeID].release(localShardSearcher); + for(IndexReader sub : subs) { + if (sub != null) { + sub.decRef(); + } + } + } + + if (searchState != null && searchState.searchAfterLocal != null && random.nextInt(5) == 3) { + priorSearches.add(searchState); + if (priorSearches.size() > 200) { + Collections.shuffle(priorSearches, random); + priorSearches.subList(100, priorSearches.size()).clear(); + } + } + } + + finish(); + } + + private PreviousSearchState assertSame(IndexSearcher mockSearcher, NodeState.ShardIndexSearcher shardSearcher, Query q, Sort sort, PreviousSearchState state) throws IOException { + + int numHits = _TestUtil.nextInt(random, 1, 100); + if (state != null && state.searchAfterLocal == null) { + // In addition to what we last searched: + numHits += state.numHitsPaged; + } + + if (VERBOSE) { + System.out.println("TEST: query=" + q + " sort=" + sort + " numHits=" + numHits); + if (state != null) { + System.out.println(" prev: searchAfterLocal=" + state.searchAfterLocal + " searchAfterShard=" + state.searchAfterShard + " numHitsPaged=" + state.numHitsPaged); + } + } + + // Single (mock local) searcher: + final TopDocs hits; + if (sort == null) { + if (state != null && state.searchAfterLocal != null) { + hits = mockSearcher.searchAfter(state.searchAfterLocal, q, numHits); + } else { + hits = mockSearcher.search(q, numHits); + } + } else { + hits = mockSearcher.search(q, numHits, sort); + } + + // Shard searcher + final TopDocs shardHits; + if (sort == null) { + if (state != null && state.searchAfterShard != null) { + shardHits = shardSearcher.searchAfter(state.searchAfterShard, q, numHits); + } else { + shardHits = shardSearcher.search(q, numHits); + } + } else { + shardHits = shardSearcher.search(q, numHits, sort); + } + + final int numNodes = shardSearcher.nodeVersions.length; + int[] base = new int[numNodes]; + final IndexReader[] subs = mockSearcher.getIndexReader().getSequentialSubReaders(); + assertEquals(numNodes, subs.length); + + int docCount = 0; + for(int nodeID=0;nodeID 0 (got " + maxAgeSec + ")"); } this.maxAgeSec = maxAgeSec; } @Override - public boolean doPrune(int ageSec, IndexSearcher searcher) { + public boolean doPrune(double ageSec, IndexSearcher searcher) { return ageSec > maxAgeSec; } } @@ -261,14 +262,25 @@ trackers.add(tracker); } Collections.sort(trackers); - final long newestSec = trackers.isEmpty() ? 0L : trackers.get(0).recordTimeSec; + double lastRecordTimeSec = 0.0; + final double now = System.nanoTime()/NANOS_PER_SEC; for (SearcherTracker tracker: trackers) { - final int ageSec = (int) (newestSec - tracker.recordTimeSec); - assert ageSec >= 0; + final double ageSec; + if (lastRecordTimeSec == 0.0) { + ageSec = 0.0; + } else { + ageSec = now - lastRecordTimeSec; + } + // First tracker is always age 0.0 sec, since it's + // still "live"; second tracker's age (= seconds since + // it was "live") is now minus first tracker's + // recordTime, etc: if (pruner.doPrune(ageSec, tracker.searcher)) { + //System.out.println("PRUNE version=" + tracker.version + " age=" + ageSec + " ms=" + System.currentTimeMillis()); searchers.remove(tracker.version); tracker.close(); } + lastRecordTimeSec = tracker.recordTimeSec; } } Index: lucene/src/java/org/apache/lucene/search/TopDocs.java =================================================================== --- lucene/src/java/org/apache/lucene/search/TopDocs.java (revision 1214418) +++ lucene/src/java/org/apache/lucene/search/TopDocs.java (working copy) @@ -216,8 +216,10 @@ float maxScore = Float.MIN_VALUE; for(int shardIDX=0;shardIDX 0) { - totalHitCount += shard.totalHits; availHitCount += shard.scoreDocs.length; queue.add(new ShardRef(shardIDX)); maxScore = Math.max(maxScore, shard.getMaxScore()); @@ -225,6 +227,10 @@ } } + if (availHitCount == 0) { + maxScore = Float.NaN; + } + final ScoreDoc[] hits = new ScoreDoc[Math.min(topN, availHitCount)]; int hitUpto = 0; Index: lucene/src/java/org/apache/lucene/search/ScoreDoc.java =================================================================== --- lucene/src/java/org/apache/lucene/search/ScoreDoc.java (revision 1214418) +++ lucene/src/java/org/apache/lucene/search/ScoreDoc.java (working copy) @@ -46,6 +46,6 @@ // A convenience method for debugging. @Override public String toString() { - return "doc=" + doc + " score=" + score; + return "doc=" + doc + " score=" + score + " shardIndex=" + shardIndex; } } Index: lucene/src/test-framework/java/org/apache/lucene/search/ShardSearchingTestBase.java =================================================================== --- lucene/src/test-framework/java/org/apache/lucene/search/ShardSearchingTestBase.java (revision 0) +++ lucene/src/test-framework/java/org/apache/lucene/search/ShardSearchingTestBase.java (working copy) @@ -0,0 +1,575 @@ +package org.apache.lucene.search; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.Term; +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.LineFileDocs; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.TermContext; + +// TODO +// - doc blocks? so we can test joins/grouping... +// - controlled consistency (NRTMgr) + +public abstract class ShardSearchingTestBase extends LuceneTestCase { + + // TODO: maybe SLM should throw this instead of returning null... + public static class SearcherExpiredException extends RuntimeException { + public SearcherExpiredException(String message) { + super(message); + } + } + + private static class FieldAndShardVersion { + private final long version; + private final int nodeID; + private final String field; + + public FieldAndShardVersion(int nodeID, long version, String field) { + this.nodeID = nodeID; + this.version = version; + this.field = field; + } + + @Override + public int hashCode() { + return (int) (version * nodeID + field.hashCode()); + } + + @Override + public boolean equals(Object _other) { + if (!(_other instanceof FieldAndShardVersion)) { + return false; + } + + final FieldAndShardVersion other = (FieldAndShardVersion) _other; + + return field.equals(other.field) && version == other.version && nodeID == other.nodeID; + } + + @Override + public String toString() { + return "FieldAndShardVersion(field=" + field + " nodeID=" + nodeID + " version=" + version+ ")"; + } + } + + private static class TermAndShardVersion { + private final long version; + private final int nodeID; + private final Term term; + + public TermAndShardVersion(int nodeID, long version, Term term) { + this.nodeID = nodeID; + this.version = version; + this.term = term; + } + + @Override + public int hashCode() { + return (int) (version * nodeID + term.hashCode()); + } + + @Override + public boolean equals(Object _other) { + if (!(_other instanceof TermAndShardVersion)) { + return false; + } + + final TermAndShardVersion other = (TermAndShardVersion) _other; + + return term.equals(other.term) && version == other.version && nodeID == other.nodeID; + } + } + + // We share collection stats for these fields on each node + // reopen: + private final String[] fieldsToShare = new String[] {"body", "title"}; + + // Called by one node once it has reopened, to notify all + // other nodes. This is just a mock (since it goes and + // directly updates all other nodes, in RAM)... in a real + // env this would hit the wire, sending version & + // collection stats to all other nodes: + void broadcastNodeReopen(int nodeID, long version, IndexSearcher newSearcher) throws IOException { + + if (VERBOSE) { + System.out.println("REOPEN: nodeID=" + nodeID + " version=" + version + " maxDoc=" + newSearcher.getIndexReader().maxDoc()); + } + + // Broadcast new collection stats for this node to all + // other nodes: + for(String field : fieldsToShare) { + final CollectionStatistics stats = newSearcher.collectionStatistics(field); + for (NodeState node : nodes) { + // Don't put my own collection stats into the cache; + // we pull locally: + if (node.myNodeID != nodeID) { + node.collectionStatsCache.put(new FieldAndShardVersion(nodeID, version, field), stats); + } + } + } + for (NodeState node : nodes) { + node.updateNodeVersion(nodeID, version); + } + } + + // MOCK: in a real env you have to hit the wire + // (send this query to all remote nodes + // concurrently): + TopDocs searchNode(int nodeID, long[] nodeVersions, Query q, Sort sort, int numHits, ScoreDoc searchAfter) throws IOException { + final NodeState.ShardIndexSearcher s = nodes[nodeID].acquire(nodeVersions); + try { + if (sort == null) { + if (searchAfter != null) { + return s.localSearchAfter(searchAfter, q, numHits); + } else { + return s.localSearch(q, numHits); + } + } else { + assert searchAfter == null; // not supported yet + return s.localSearch(q, numHits, sort); + } + } finally { + nodes[nodeID].release(s); + } + } + + // Mock: in a real env, this would hit the wire and get + // term stats from remote node + Map getNodeTermStats(Set terms, int nodeID, long version) throws IOException { + final NodeState node = nodes[nodeID]; + final Map stats = new HashMap(); + final IndexSearcher s = node.searchers.acquire(version); + if (s == null) { + throw new SearcherExpiredException("node=" + nodeID + " version=" + version); + } + try { + for(Term term : terms) { + final TermContext termContext = TermContext.build(s.getIndexReader().getTopReaderContext(), term, false); + stats.put(term, s.termStatistics(term, termContext)); + } + } finally { + node.searchers.release(s); + } + return stats; + } + + protected final class NodeState implements Closeable { + public final Directory dir; + public final IndexWriter writer; + public final SearcherLifetimeManager searchers; + public final SearcherManager mgr; + public final int myNodeID; + public final long[] currentNodeVersions; + + // TODO: nothing evicts from here!!! Somehow, on searcher + // expiration on remote nodes we must evict from our + // local cache...? + + private final Map collectionStatsCache = new ConcurrentHashMap(); + private final Map termStatsCache = new ConcurrentHashMap(); + + /** Matches docs in the local shard but scores based on + * aggregated stats ("mock distributed scoring") from all + * nodes. */ + + public class ShardIndexSearcher extends IndexSearcher { + // Version for the node searchers we search: + public final long[] nodeVersions; + public final int myNodeID; + + public ShardIndexSearcher(long[] nodeVersions, IndexReader localReader, int nodeID) { + super(localReader); + this.nodeVersions = nodeVersions; + myNodeID = nodeID; + assert myNodeID == NodeState.this.myNodeID: "myNodeID=" + nodeID + " NodeState.this.myNodeID=" + NodeState.this.myNodeID; + } + + @Override + public Query rewrite(Query original) throws IOException { + final Query rewritten = super.rewrite(original); + final Set terms = new HashSet(); + rewritten.extractTerms(terms); + + // Make a single request to remote nodes for term + // stats: + for(int nodeID=0;nodeID missing = new HashSet(); + for(Term term : terms) { + final TermAndShardVersion key = new TermAndShardVersion(nodeID, nodeVersions[nodeID], term); + if (!termStatsCache.containsKey(key)) { + missing.add(term); + } + } + if (missing.size() != 0) { + for(Map.Entry ent : getNodeTermStats(missing, nodeID, nodeVersions[nodeID]).entrySet()) { + final TermAndShardVersion key = new TermAndShardVersion(nodeID, nodeVersions[nodeID], ent.getKey()); + termStatsCache.put(key, ent.getValue()); + } + } + } + + return rewritten; + } + + @Override + public TermStatistics termStatistics(Term term, TermContext context) throws IOException { + assert term != null; + int docFreq = 0; + long totalTermFreq = 0; + for(int nodeID=0;nodeID