Index: lucene/contrib/grouping/src/test/org/apache/lucene/search/grouping/TestCachingCollector.java =================================================================== --- lucene/contrib/grouping/src/test/org/apache/lucene/search/grouping/TestCachingCollector.java (revision 0) +++ lucene/contrib/grouping/src/test/org/apache/lucene/search/grouping/TestCachingCollector.java (revision 0) @@ -0,0 +1,168 @@ +package org.apache.lucene.search.grouping; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.LuceneTestCase; + +public class TestCachingCollector extends LuceneTestCase { + + private static final double ONE_BYTE = 1.0 / (1024 * 1024); // 1 byte out of MB + + private static class MockScorer extends Scorer { + + private MockScorer() { + super((Weight) null); + } + + @Override + public float score() throws IOException { return 0; } + + @Override + public int docID() { return 0; } + + @Override + public int nextDoc() throws IOException { return 0; } + + @Override + public int advance(int target) throws IOException { return 0; } + + } + + private static class NoOpCollector extends Collector { + + private final boolean acceptDocsOutOfOrder; + + public NoOpCollector(boolean acceptDocsOutOfOrder) { + this.acceptDocsOutOfOrder = acceptDocsOutOfOrder; + } + + @Override + public void setScorer(Scorer scorer) throws IOException {} + + @Override + public void collect(int doc) throws IOException {} + + @Override + public void setNextReader(IndexReader reader, int docBase) throws IOException {} + + @Override + public boolean acceptsDocsOutOfOrder() { + return acceptDocsOutOfOrder; + } + + } + + public void testBasic() throws Exception { + CachingCollector cc = new CachingCollector(new NoOpCollector(false), true, 1); + cc.setScorer(new MockScorer()); + + // collect 1000 docs + for (int i = 0; i < 1000; i++) { + cc.collect(i); + } + + // now replay them + cc.replay(new Collector() { + int prevDocID = -1; + + @Override + public void setScorer(Scorer scorer) throws IOException {} + + @Override + public void setNextReader(IndexReader reader, int docBase) throws IOException {} + + @Override + public void collect(int doc) throws IOException { + assertEquals(prevDocID + 1, doc); + prevDocID = doc; + } + + @Override + public boolean acceptsDocsOutOfOrder() { + return false; + } + }); + } + + public void testIllegalStateOnReplay() throws Exception { + CachingCollector cc = new CachingCollector(new NoOpCollector(false), true, 50 * ONE_BYTE); + cc.setScorer(new MockScorer()); + + // collect 130 docs, this should be enough for triggering cache abort. + for (int i = 0; i < 130; i++) { + cc.collect(i); + } + + assertFalse("CachingCollector should not be cached due to low memory limit", cc.isCached()); + + try { + cc.replay(new NoOpCollector(false)); + fail("replay should fail if CachingCollector is not cached"); + } catch (IllegalStateException e) { + // expected + } + } + + public void testIllegalCollectorOnReplay() throws Exception { + // tests that the Collector passed to replay() has an out-of-order mode that + // is valid with the Collector passed to the ctor + + // 'src' Collector does not support out-of-order + CachingCollector cc = new CachingCollector(new NoOpCollector(false), true, 50 * ONE_BYTE); + cc.setScorer(new MockScorer()); + for (int i = 0; i < 10; i++) cc.collect(i); + cc.replay(new NoOpCollector(true)); // this call should not fail + cc.replay(new NoOpCollector(false)); // this call should not fail + + // 'src' Collector supports out-of-order + cc = new CachingCollector(new NoOpCollector(true), true, 50 * ONE_BYTE); + cc.setScorer(new MockScorer()); + for (int i = 0; i < 10; i++) cc.collect(i); + cc.replay(new NoOpCollector(true)); // this call should not fail + try { + cc.replay(new NoOpCollector(false)); // this call should fail + fail("should have failed if an in-order Collector was given to replay(), " + + "while CachingCollector was initialized with out-of-order collection"); + } catch (IllegalArgumentException e) { + // ok + } + } + + public void testCachedArraysAllocation() throws Exception { + // tests the cached arrays allocation -- if the 'nextLength' was too high, + // caching would terminate even if a smaller length would suffice. + + // set RAM limit enough for 150 docs + random(10000) + int numDocs = random.nextInt(10000) + 150; + CachingCollector cc = new CachingCollector(new NoOpCollector(false), true, 8 * ONE_BYTE * numDocs); + cc.setScorer(new MockScorer()); + for (int i = 0; i < numDocs; i++) cc.collect(i); + assertTrue(cc.isCached()); + + // The 151's document should terminate caching + cc.collect(numDocs); + assertFalse(cc.isCached()); + } + +} Property changes on: lucene/contrib/grouping/src/test/org/apache/lucene/search/grouping/TestCachingCollector.java ___________________________________________________________________ Added: svn:executable + * Added: svn:eol-style + native Index: lucene/contrib/grouping/src/java/org/apache/lucene/search/grouping/CachingCollector.java =================================================================== --- lucene/contrib/grouping/src/java/org/apache/lucene/search/grouping/CachingCollector.java (revision 1103598) +++ lucene/contrib/grouping/src/java/org/apache/lucene/search/grouping/CachingCollector.java (working copy) @@ -24,7 +24,7 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.Collector; import org.apache.lucene.search.Scorer; -import org.apache.lucene.search.Similarity; +import org.apache.lucene.search.Weight; import org.apache.lucene.util.RamUsageEstimator; /** @@ -42,6 +42,9 @@ * set is large this can easily be a very substantial amount * of RAM! * + *

NOTE: this class caches at least 128 documents + * before checking RAM limits. + * *

See {@link org.apache.lucene.search.grouping} for more * details including a full code example.

* @@ -49,6 +52,10 @@ */ public class CachingCollector extends Collector { + // Max out at 512K arrays + private static final int MAX_ARRAY_SIZE = 512 * 1024; + private static final int INITIAL_ARRAY_SIZE = 128; + private static class SegStart { public final IndexReader reader; public final int base; @@ -60,14 +67,42 @@ this.end = end; } } + + private static class CachedScorer extends Scorer { + // NOTE: these members are package-private b/c that way accessing them from + // the outer class does not incur access check by the JVM. The same + // situation would be if they were defined in the outer class as private + // members. + int doc; + float score; + + private CachedScorer() { super((Weight) null); } + + @Override + public float score() { return score; } + + @Override + public int advance(int target) { throw new UnsupportedOperationException(); } + + @Override + public int docID() { return doc; } + + @Override + public float freq() { throw new UnsupportedOperationException(); } + + @Override + public int nextDoc() { throw new UnsupportedOperationException(); } + } + // TODO: would be nice if a collector defined a // needsScores() method so we can specialize / do checks // up front: private final Collector other; private final int maxDocsToCache; - private final Scorer cachedScorer; + private final boolean cacheScores; + private final CachedScorer cachedScorer; private final List cachedDocs; private final List cachedScores; private final List cachedSegs = new ArrayList(); @@ -77,40 +112,14 @@ private float[] curScores; private int upto; private IndexReader lastReader; - private float score; private int base; private int lastDocBase; - private int doc; public CachingCollector(Collector other, boolean cacheScores, double maxRAMMB) { this.other = other; + this.cacheScores = cacheScores; if (cacheScores) { - cachedScorer = new Scorer((Similarity) null) { - @Override - public float score() { - return score; - } - - @Override - public int advance(int target) { - throw new UnsupportedOperationException(); - } - - @Override - public int docID() { - return doc; - } - - @Override - public float freq() { - throw new UnsupportedOperationException(); - } - - @Override - public int nextDoc() { - throw new UnsupportedOperationException(); - } - }; + cachedScorer = new CachedScorer(); cachedScores = new ArrayList(); curScores = new float[128]; cachedScores.add(curScores); @@ -119,16 +128,14 @@ cachedScores = null; } cachedDocs = new ArrayList(); - curDocs = new int[128]; + curDocs = new int[INITIAL_ARRAY_SIZE]; cachedDocs.add(curDocs); - final int bytesPerDoc; - if (curScores != null) { - bytesPerDoc = RamUsageEstimator.NUM_BYTES_INT + RamUsageEstimator.NUM_BYTES_FLOAT; - } else { - bytesPerDoc = RamUsageEstimator.NUM_BYTES_INT; + int bytesPerDoc = RamUsageEstimator.NUM_BYTES_INT; + if (cacheScores) { + bytesPerDoc += RamUsageEstimator.NUM_BYTES_FLOAT; } - maxDocsToCache = (int) ((maxRAMMB * 1024 * 1024)/bytesPerDoc); + maxDocsToCache = (int) ((maxRAMMB * 1024 * 1024) / bytesPerDoc); } @Override @@ -147,52 +154,60 @@ if (curDocs == null) { // Cache was too large - if (curScores != null) { - score = scorer.score(); + if (cacheScores) { + cachedScorer.score = scorer.score(); } - this.doc = doc; + cachedScorer.doc = doc; other.collect(doc); return; } + // Allocate a bigger array or abort caching if (upto == curDocs.length) { base += upto; - final int nextLength; - // Max out at 512K arrays: - if (curDocs.length < 524288) { - nextLength = 8*curDocs.length; - } else { - nextLength = curDocs.length; + + // Compute next array length - don't allocate too big arrays + int nextLength = 8*curDocs.length; + if (nextLength > MAX_ARRAY_SIZE) { + nextLength = MAX_ARRAY_SIZE; } if (base + nextLength > maxDocsToCache) { - // Too many docs to collect -- clear cache - curDocs = null; - if (curScores != null) { - score = scorer.score(); + // try to allocate a smaller array + nextLength = maxDocsToCache - base; + if (nextLength <= 0) { + // Too many docs to collect -- clear cache + curDocs = null; + curScores = null; + cachedSegs.clear(); + cachedDocs.clear(); + cachedScores.clear(); + if (cacheScores) { + cachedScorer.score = scorer.score(); + } + cachedScorer.doc = doc; + other.collect(doc); + return; } - this.doc = doc; - other.collect(doc); - cachedDocs.clear(); - cachedScores.clear(); - return; } + curDocs = new int[nextLength]; cachedDocs.add(curDocs); - if (curScores != null) { + if (cacheScores) { curScores = new float[nextLength]; cachedScores.add(curScores); } upto = 0; } + curDocs[upto] = doc; // TODO: maybe specialize private subclass so we don't // null check per collect... - if (curScores != null) { - score = curScores[upto] = scorer.score(); + if (cacheScores) { + cachedScorer.score = curScores[upto] = scorer.score(); } upto++; - this.doc = doc; + cachedScorer.doc = doc; other.collect(doc); } @@ -204,7 +219,7 @@ public void setNextReader(IndexReader reader, int docBase) throws IOException { other.setNextReader(reader, docBase); if (lastReader != null) { - cachedSegs.add(new SegStart(lastReader, lastDocBase, base+upto)); + cachedSegs.add(new SegStart(lastReader, lastDocBase, base + upto)); } lastDocBase = docBase; lastReader = reader; @@ -215,50 +230,62 @@ @Override public String toString() { if (isCached()) { - return "CachingCollector (" + (base+upto) + " docs " + (curScores != null ? " & scores" : "") + " cached)"; + return "CachingCollector (" + (base+upto) + " docs " + (cacheScores ? " & scores" : "") + " cached)"; } else { return "CachingCollector (cache was cleared)"; } } + /** + * Replays the cached doc IDs (and scores) to the given Collector. + * + * @throws IllegalStateException + * if this collector is not cached (i.e., if the RAM limits were too + * low for the number of documents + scores to cache). + * @throws IllegalArgumentException + * if the given Collect's does not support out-of-order collection, + * while the collector passed to the ctor does. + */ public void replay(Collector other) throws IOException { if (!isCached()) { throw new IllegalStateException("cannot replay: cache was cleared because too much RAM was required"); } + + if (!other.acceptsDocsOutOfOrder() && this.other.acceptsDocsOutOfOrder()) { + throw new IllegalArgumentException( + "cannot replay: given collector does not support " + + "out-of-order collection, while the wrapped collector does. " + + "Therefore cached documents may be out-of-order."); + } + //System.out.println("CC: replay totHits=" + (upto + base)); if (lastReader != null) { cachedSegs.add(new SegStart(lastReader, lastDocBase, base+upto)); lastReader = null; } - final int uptoSav = upto; - final int baseSav = base; - try { - upto = 0; - base = 0; - int chunkUpto = 0; - other.setScorer(cachedScorer); - curDocs = EMPTY_INT_ARRAY; - for(SegStart seg : cachedSegs) { - other.setNextReader(seg.reader, seg.base); - while(base+upto < seg.end) { - if (upto == curDocs.length) { - base += curDocs.length; - curDocs = cachedDocs.get(chunkUpto); - if (curScores != null) { - curScores = cachedScores.get(chunkUpto); - } - chunkUpto++; - upto = 0; + + int curupto = 0; + int curbase = 0; + int chunkUpto = 0; + other.setScorer(cachedScorer); + curDocs = EMPTY_INT_ARRAY; + for (SegStart seg : cachedSegs) { + other.setNextReader(seg.reader, seg.base); + while (curbase + curupto < seg.end) { + if (curupto == curDocs.length) { + curbase += curDocs.length; + curDocs = cachedDocs.get(chunkUpto); + if (cacheScores) { + curScores = cachedScores.get(chunkUpto); } - if (curScores != null) { - score = curScores[upto]; - } - other.collect(curDocs[upto++]); + chunkUpto++; + curupto = 0; } + if (cacheScores) { + cachedScorer.score = curScores[curupto]; + } + other.collect(curDocs[curupto++]); } - } finally { - upto = uptoSav; - base = baseSav; } } }