Index: lucene/src/test/org/apache/lucene/search/TestShardSearching.java =================================================================== --- lucene/src/test/org/apache/lucene/search/TestShardSearching.java (revision 0) +++ lucene/src/test/org/apache/lucene/search/TestShardSearching.java (working copy) @@ -0,0 +1,183 @@ +package org.apache.lucene.search; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.MultiFields; +import org.apache.lucene.index.MultiReader; +import org.apache.lucene.index.Term; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util._TestUtil; + +// TODO +// - test searchAfter too, w/ sharding! + +public class TestShardSearching extends ShardSearchingTestBase { + + public void testSimple() throws Exception { + // TODO: randomize: + start(_TestUtil.getTempDir("TestShardSearching").toString(), + _TestUtil.nextInt(random, 1, 10), // numShards + 10.0*RANDOM_MULTIPLIER, + _TestUtil.nextInt(random, 1, 4) // maxSearcherAgeSeconds + ); + final List lastVersions = new ArrayList(); + List terms = null; + while (System.nanoTime() < endTimeNanos) { + final SearcherAndVersion[] searchers; + // TODO: make this smart, not just lastVersion, but eg + // save some really old versions and rerun, so that we + // sometimes hit timeout + + final boolean doFollowon = lastVersions.size() > 0 && random.nextInt(7) == 1; + + if (doFollowon) { + final long[] lastVersion = lastVersions.get(random.nextInt(lastVersions.size())); + // Do followon query; could be "old"! + try { + searchers = acquire(lastVersion); + } catch (SearcherExpiredException see) { + // Expected, sometimes; in a "real" app we would + // either forward this error to the user ("too + // much time has passed; please re-run your + // search") or sneakily just switch to newest + // searcher w/o telling them... + lastVersions.remove(lastVersion); + continue; + } + } else { + // Do fresh query: + searchers = acquire(); + } + + final long[] shardVersions = justVersion(searchers); + + try { + + // Make single (MultiReader) from all searchers: + int docCount = 0; + final IndexReader[] subs = new IndexReader[searchers.length]; + for(int subID=0;subID 10) { + // TODO: try to "focus" on high freq terms sometimes too + // TODO: maybe also periodically reset the terms...? + final TermsEnum termsEnum = MultiFields.getTerms(singleReader, "body").iterator(null); + terms = new ArrayList(); + while(termsEnum.next() != null) { + terms.add(BytesRef.deepCopyOf(termsEnum.term())); + } + System.out.println(terms.size() + " terms"); + if (terms.size() == 0) { + terms = null; + } + } + + if (terms != null) { + TermQuery query = new TermQuery(new Term("body", terms.get(random.nextInt(terms.size())))); + + try { + assertSame(singleSearcher, searchers, shardVersions, query); + } catch (SearcherExpiredException see) { + // Expected, sometimes; in a "real" app we would + // either forward this error to the user ("too + // much time has passed; please re-run your + // search") or sneakily just switch to newest + // searcher w/o telling them... + assert lastVersions.indexOf(shardVersions) != -1; + lastVersions.remove(shardVersions); + } + } + } finally { + release(searchers); + } + + if (!doFollowon && random.nextInt(50) == 17) { + lastVersions.add(shardVersions); + if (lastVersions.size() > 200) { + Collections.shuffle(lastVersions, random); + lastVersions.subList(100, lastVersions.size()).clear(); + } + } + } + + finish(); + } + + private void assertSame(IndexSearcher singleSearcher, SearcherAndVersion[] shardSearchers, long[] shardVersions, Query q) throws IOException { + final int numHits = _TestUtil.nextInt(random, 1, 100); + + // Single searcher: + TopDocs hits = singleSearcher.search(q, numHits); + + // Sharded searcher: + final TopDocs[] shardHits = new TopDocs[shardSearchers.length]; + for(int shardID=0;shardID= 0) { + indices[shardID].searchers.release(searchers[shardID].searcher); + } + throw new SearcherExpiredException("shardID=" + shardID + " version=" + version); + } + searchers[shardID] = new SearcherAndVersion(s, version); + } + + return searchers; + } + + protected void release(SearcherAndVersion[] searchers) throws IOException { + for(int shardID=0;shardID collectionStatsCache = new ConcurrentHashMap(); + + // NOTE: in a "real" shard env this hits the wire to talk + // to the remote node. We mock that up here by just + // querying our "local" shards. Also, we would pass + // Set fields, not one field at a time...: + CollectionStatistics getShardCollectionStats(int shardID, long version, String field) throws IOException { + final FieldAndShardVersion key = new FieldAndShardVersion(shardID, version, field); + CollectionStatistics stats = collectionStatsCache.get(key); + if (stats != null) { + // Cache hit + return stats; + } + + // Cache miss + final IndexSearcher s = indices[shardID].searchers.acquire(version); + if (s == null) { + throw new SearcherExpiredException("shardID=" + shardID + " version=" + version); + } + try { + stats = s.collectionStatistics(field); + } finally { + indices[shardID].searchers.release(s); + } + collectionStatsCache.put(key, stats); + return stats; + } + + private static class TermAndShardVersion { + private final long version; + private final int shardID; + private final Term term; + + public TermAndShardVersion(int shardID, long version, Term term) { + this.shardID = shardID; + this.version = version; + this.term = term; + } + + @Override + public int hashCode() { + return (int) (version * shardID + term.hashCode()); + } + + @Override + public boolean equals(Object _other) { + if (!(_other instanceof TermAndShardVersion)) { + return false; + } + + final TermAndShardVersion other = (TermAndShardVersion) _other; + + return term.equals(other.term) && version == other.version && shardID == other.shardID; + } + } + + // TODO: nothing evicts from here!!! + private final Map termStatsCache = new ConcurrentHashMap(); + + // NOTE: in a "real" shard env this hits the wire to talk + // to the remote node. We mock that up here by just + // querying our "local" shards: + TermStatistics getShardTermStats(int shardID, long version, Term term, TermContext termContext) throws IOException { + final TermAndShardVersion key = new TermAndShardVersion(shardID, version, term); + TermStatistics stats = termStatsCache.get(key); + if (stats != null) { + // Cache hit + return stats; + } + + // Cache miss + final IndexSearcher s = indices[shardID].searchers.acquire(version); + if (s == null) { + throw new SearcherExpiredException("shardID=" + shardID + " version=" + version); + } + try { + // nocommit: is this right??? + if (termContext == null) { + termContext = TermContext.build(s.getIndexReader().getTopReaderContext(), term, false); + } + stats = s.termStatistics(term, termContext); + } finally { + indices[shardID].searchers.release(s); + } + + termStatsCache.put(key, stats); + return stats; + } + + + /** Matches docs in the local shard but scores based on + * aggregated stats ("mock distributed scoring") from all + * shards, pull into a shared cache. */ + + protected class MockSlowShardIndexSearcher extends IndexSearcher { + // Version for the sub searchers we search: + private final long[] subVersions; + private final int shardID; + + public MockSlowShardIndexSearcher(long[] subVersions, IndexReader localReader, int shardID) { + super(localReader); + this.subVersions = subVersions; + this.shardID = shardID; + + // TODO: here we should pre-cache the collection stats + // for fields we "know" we will be querying... + } + + @Override + public TermStatistics termStatistics(Term term, TermContext context) throws IOException { + assert term != null; + int docFreq = 0; + long totalTermFreq = 0; + for(int shardID=0;shardID