Index: lucene/contrib/grouping/src/test/org/apache/lucene/search/grouping/TestGrouping.java =================================================================== --- lucene/contrib/grouping/src/test/org/apache/lucene/search/grouping/TestGrouping.java (revision 1104151) +++ lucene/contrib/grouping/src/test/org/apache/lucene/search/grouping/TestGrouping.java (working copy) @@ -441,7 +441,7 @@ if (VERBOSE) { System.out.println("TEST: maxCacheMB=" + maxCacheMB); } - c = cCache = new CachingCollector(c1, true, maxCacheMB); + c = cCache = CachingCollector.create(c1, true, maxCacheMB); } else { c = c1; cCache = null; Index: lucene/contrib/grouping/src/java/org/apache/lucene/search/grouping/package.html =================================================================== --- lucene/contrib/grouping/src/java/org/apache/lucene/search/grouping/package.html (revision 1104151) +++ lucene/contrib/grouping/src/java/org/apache/lucene/search/grouping/package.html (working copy) @@ -73,7 +73,7 @@ boolean cacheScores = true; double maxCacheRAMMB = 4.0; - CachingCollector cachedCollector = new CachingCollector(c1, cacheScores, maxCacheRAMMB); + CachingCollector cachedCollector = CachingCollector.create(c1, cacheScores, maxCacheRAMMB); s.search(new TermQuery(new Term("content", searchTerm)), cachedCollector); Collection topGroups = c1.getTopGroups(groupOffset, fillFields); Index: lucene/src/test/org/apache/lucene/search/TestCachingCollector.java =================================================================== --- lucene/src/test/org/apache/lucene/search/TestCachingCollector.java (revision 1104151) +++ lucene/src/test/org/apache/lucene/search/TestCachingCollector.java (working copy) @@ -75,39 +75,41 @@ } public void testBasic() throws Exception { - CachingCollector cc = new CachingCollector(new NoOpCollector(false), true, 1); - cc.setScorer(new MockScorer()); - - // collect 1000 docs - for (int i = 0; i < 1000; i++) { - cc.collect(i); - } - - // now replay them - cc.replay(new Collector() { - int prevDocID = -1; + for (boolean cacheScores : new boolean[] { false, true }) { + CachingCollector cc = CachingCollector.create(new NoOpCollector(false), cacheScores, 1); + cc.setScorer(new MockScorer()); - @Override - public void setScorer(Scorer scorer) throws IOException {} - - @Override - public void setNextReader(IndexReader reader, int docBase) throws IOException {} - - @Override - public void collect(int doc) throws IOException { - assertEquals(prevDocID + 1, doc); - prevDocID = doc; + // collect 1000 docs + for (int i = 0; i < 1000; i++) { + cc.collect(i); } - @Override - public boolean acceptsDocsOutOfOrder() { - return false; - } - }); + // now replay them + cc.replay(new Collector() { + int prevDocID = -1; + + @Override + public void setScorer(Scorer scorer) throws IOException {} + + @Override + public void setNextReader(IndexReader reader, int docBase) throws IOException {} + + @Override + public void collect(int doc) throws IOException { + assertEquals(prevDocID + 1, doc); + prevDocID = doc; + } + + @Override + public boolean acceptsDocsOutOfOrder() { + return false; + } + }); + } } public void testIllegalStateOnReplay() throws Exception { - CachingCollector cc = new CachingCollector(new NoOpCollector(false), true, 50 * ONE_BYTE); + CachingCollector cc = CachingCollector.create(new NoOpCollector(false), true, 50 * ONE_BYTE); cc.setScorer(new MockScorer()); // collect 130 docs, this should be enough for triggering cache abort. @@ -130,14 +132,14 @@ // is valid with the Collector passed to the ctor // 'src' Collector does not support out-of-order - CachingCollector cc = new CachingCollector(new NoOpCollector(false), true, 50 * ONE_BYTE); + CachingCollector cc = CachingCollector.create(new NoOpCollector(false), true, 50 * ONE_BYTE); cc.setScorer(new MockScorer()); for (int i = 0; i < 10; i++) cc.collect(i); cc.replay(new NoOpCollector(true)); // this call should not fail cc.replay(new NoOpCollector(false)); // this call should not fail // 'src' Collector supports out-of-order - cc = new CachingCollector(new NoOpCollector(true), true, 50 * ONE_BYTE); + cc = CachingCollector.create(new NoOpCollector(true), true, 50 * ONE_BYTE); cc.setScorer(new MockScorer()); for (int i = 0; i < 10; i++) cc.collect(i); cc.replay(new NoOpCollector(true)); // this call should not fail @@ -156,14 +158,18 @@ // set RAM limit enough for 150 docs + random(10000) int numDocs = random.nextInt(10000) + 150; - CachingCollector cc = new CachingCollector(new NoOpCollector(false), true, 8 * ONE_BYTE * numDocs); - cc.setScorer(new MockScorer()); - for (int i = 0; i < numDocs; i++) cc.collect(i); - assertTrue(cc.isCached()); - - // The 151's document should terminate caching - cc.collect(numDocs); - assertFalse(cc.isCached()); + for (boolean cacheScores : new boolean[] { false, true }) { + int bytesPerDoc = cacheScores ? 8 : 4; + CachingCollector cc = CachingCollector.create(new NoOpCollector(false), + cacheScores, bytesPerDoc * ONE_BYTE * numDocs); + cc.setScorer(new MockScorer()); + for (int i = 0; i < numDocs; i++) cc.collect(i); + assertTrue(cc.isCached()); + + // The 151's document should terminate caching + cc.collect(numDocs); + assertFalse(cc.isCached()); + } } } Index: lucene/src/java/org/apache/lucene/search/CachingCollector.java =================================================================== --- lucene/src/java/org/apache/lucene/search/CachingCollector.java (revision 1104151) +++ lucene/src/java/org/apache/lucene/search/CachingCollector.java (working copy) @@ -47,7 +47,7 @@ * * @lucene.experimental */ -public class CachingCollector extends Collector { +public abstract class CachingCollector extends Collector { // Max out at 512K arrays private static final int MAX_ARRAY_SIZE = 512 * 1024; @@ -66,7 +66,7 @@ } } - private static class CachedScorer extends Scorer { + private static final class CachedScorer extends Scorer { // NOTE: these members are package-private b/c that way accessing them from // the outer class does not incur access check by the JVM. The same @@ -78,137 +78,261 @@ private CachedScorer() { super((Weight) null); } @Override - public float score() { return score; } + public final float score() { return score; } @Override - public int advance(int target) { throw new UnsupportedOperationException(); } + public final int advance(int target) { throw new UnsupportedOperationException(); } @Override - public int docID() { return doc; } + public final int docID() { return doc; } @Override - public float freq() { throw new UnsupportedOperationException(); } + public final float freq() { throw new UnsupportedOperationException(); } @Override - public int nextDoc() { throw new UnsupportedOperationException(); } + public final int nextDoc() { throw new UnsupportedOperationException(); } } - // TODO: would be nice if a collector defined a - // needsScores() method so we can specialize / do checks - // up front: - private final Collector other; - private final int maxDocsToCache; + // A CachingCollector which caches scores + private static final class ScoreCachingCollector extends CachingCollector { - private final boolean cacheScores; - private final CachedScorer cachedScorer; - private final List cachedDocs; - private final List cachedScores; - private final List cachedSegs = new ArrayList(); + private final CachedScorer cachedScorer; + private final List cachedScores; - private Scorer scorer; - private int[] curDocs; - private float[] curScores; - private int upto; - private IndexReader lastReader; - private int base; - private int lastDocBase; + private Scorer scorer; + private float[] curScores; - public CachingCollector(Collector other, boolean cacheScores, double maxRAMMB) { - this.other = other; - this.cacheScores = cacheScores; - if (cacheScores) { + ScoreCachingCollector(Collector other, double maxRAMMB) { + super(other, maxRAMMB, true); + cachedScorer = new CachedScorer(); cachedScores = new ArrayList(); curScores = new float[128]; cachedScores.add(curScores); - } else { - cachedScorer = null; - cachedScores = null; } - cachedDocs = new ArrayList(); - curDocs = new int[INITIAL_ARRAY_SIZE]; - cachedDocs.add(curDocs); + + @Override + public void collect(int doc) throws IOException { - int bytesPerDoc = RamUsageEstimator.NUM_BYTES_INT; - if (cacheScores) { - bytesPerDoc += RamUsageEstimator.NUM_BYTES_FLOAT; - } - maxDocsToCache = (int) ((maxRAMMB * 1024 * 1024) / bytesPerDoc); - } - - @Override - public void setScorer(Scorer scorer) throws IOException { - this.scorer = scorer; - other.setScorer(cachedScorer); - } + if (curDocs == null) { + // Cache was too large + cachedScorer.score = scorer.score(); + cachedScorer.doc = doc; + other.collect(doc); + return; + } - @Override - public boolean acceptsDocsOutOfOrder() { - return other.acceptsDocsOutOfOrder(); - } + // Allocate a bigger array or abort caching + if (upto == curDocs.length) { + base += upto; + + // Compute next array length - don't allocate too big arrays + int nextLength = 8*curDocs.length; + if (nextLength > MAX_ARRAY_SIZE) { + nextLength = MAX_ARRAY_SIZE; + } - @Override - public void collect(int doc) throws IOException { - - if (curDocs == null) { - // Cache was too large - if (cacheScores) { - cachedScorer.score = scorer.score(); + if (base + nextLength > maxDocsToCache) { + // try to allocate a smaller array + nextLength = maxDocsToCache - base; + if (nextLength <= 0) { + // Too many docs to collect -- clear cache + curDocs = null; + curScores = null; + cachedSegs.clear(); + cachedDocs.clear(); + cachedScores.clear(); + cachedScorer.score = scorer.score(); + cachedScorer.doc = doc; + other.collect(doc); + return; + } + } + + curDocs = new int[nextLength]; + cachedDocs.add(curDocs); + curScores = new float[nextLength]; + cachedScores.add(curScores); + upto = 0; } + + curDocs[upto] = doc; + cachedScorer.score = curScores[upto] = scorer.score(); + upto++; cachedScorer.doc = doc; other.collect(doc); - return; } - // Allocate a bigger array or abort caching - if (upto == curDocs.length) { - base += upto; + @Override + public void replay(Collector other) throws IOException { + replayInit(other); - // Compute next array length - don't allocate too big arrays - int nextLength = 8*curDocs.length; - if (nextLength > MAX_ARRAY_SIZE) { - nextLength = MAX_ARRAY_SIZE; + int curupto = 0; + int curbase = 0; + int chunkUpto = 0; + other.setScorer(cachedScorer); + curDocs = EMPTY_INT_ARRAY; + for (SegStart seg : cachedSegs) { + other.setNextReader(seg.reader, seg.base); + while (curbase + curupto < seg.end) { + if (curupto == curDocs.length) { + curbase += curDocs.length; + curDocs = cachedDocs.get(chunkUpto); + curScores = cachedScores.get(chunkUpto); + chunkUpto++; + curupto = 0; + } + cachedScorer.score = curScores[curupto]; + other.collect(curDocs[curupto++]); + } } + } - if (base + nextLength > maxDocsToCache) { - // try to allocate a smaller array - nextLength = maxDocsToCache - base; - if (nextLength <= 0) { - // Too many docs to collect -- clear cache - curDocs = null; - curScores = null; - cachedSegs.clear(); - cachedDocs.clear(); - cachedScores.clear(); - if (cacheScores) { - cachedScorer.score = scorer.score(); + @Override + public void setScorer(Scorer scorer) throws IOException { + this.scorer = scorer; + other.setScorer(cachedScorer); + } + + @Override + public String toString() { + if (isCached()) { + return "CachingCollector (" + (base+upto) + " docs & scores cached)"; + } else { + return "CachingCollector (cache was cleared)"; + } + } + + } + + // A CachingCollector which does not cache scores + private static final class NoScoreCachingCollector extends CachingCollector { + + NoScoreCachingCollector(Collector other, double maxRAMMB) { + super(other, maxRAMMB, false); + } + + @Override + public void collect(int doc) throws IOException { + + if (curDocs == null) { + // Cache was too large + other.collect(doc); + return; + } + + // Allocate a bigger array or abort caching + if (upto == curDocs.length) { + base += upto; + + // Compute next array length - don't allocate too big arrays + int nextLength = 8*curDocs.length; + if (nextLength > MAX_ARRAY_SIZE) { + nextLength = MAX_ARRAY_SIZE; + } + + if (base + nextLength > maxDocsToCache) { + // try to allocate a smaller array + nextLength = maxDocsToCache - base; + if (nextLength <= 0) { + // Too many docs to collect -- clear cache + curDocs = null; + cachedSegs.clear(); + cachedDocs.clear(); + other.collect(doc); + return; } - cachedScorer.doc = doc; - other.collect(doc); - return; } + + curDocs = new int[nextLength]; + cachedDocs.add(curDocs); + upto = 0; } - curDocs = new int[nextLength]; - cachedDocs.add(curDocs); - if (cacheScores) { - curScores = new float[nextLength]; - cachedScores.add(curScores); + curDocs[upto] = doc; + upto++; + other.collect(doc); + } + + @Override + public void replay(Collector other) throws IOException { + replayInit(other); + + int curupto = 0; + int curbase = 0; + int chunkUpto = 0; + curDocs = EMPTY_INT_ARRAY; + for (SegStart seg : cachedSegs) { + other.setNextReader(seg.reader, seg.base); + while (curbase + curupto < seg.end) { + if (curupto == curDocs.length) { + curbase += curDocs.length; + curDocs = cachedDocs.get(chunkUpto); + chunkUpto++; + curupto = 0; + } + other.collect(curDocs[curupto++]); + } } - upto = 0; } - - curDocs[upto] = doc; - // TODO: maybe specialize private subclass so we don't - // null check per collect... + + @Override + public void setScorer(Scorer scorer) throws IOException { + other.setScorer(scorer); + } + + @Override + public String toString() { + if (isCached()) { + return "CachingCollector (" + (base+upto) + " docs cached)"; + } else { + return "CachingCollector (cache was cleared)"; + } + } + + } + + // TODO: would be nice if a collector defined a + // needsScores() method so we can specialize / do checks + // up front. This is only relevant for the ScoreCaching + // version -- if the wrapped Collector does not need + // scores, it can avoid cachedScorer entirely. + protected final Collector other; + + protected final int maxDocsToCache; + protected final List cachedSegs = new ArrayList(); + protected final List cachedDocs; + + protected int[] curDocs; + protected int upto; + protected IndexReader lastReader; + protected int base; + protected int lastDocBase; + + public static CachingCollector create(Collector other, boolean cacheScores, double maxRAMMB) { + return cacheScores ? new ScoreCachingCollector(other, maxRAMMB) : new NoScoreCachingCollector(other, maxRAMMB); + } + + // Prevent extension from non-internal classes + private CachingCollector(Collector other, double maxRAMMB, boolean cacheScores) { + this.other = other; + + cachedDocs = new ArrayList(); + curDocs = new int[INITIAL_ARRAY_SIZE]; + cachedDocs.add(curDocs); + + int bytesPerDoc = RamUsageEstimator.NUM_BYTES_INT; if (cacheScores) { - cachedScorer.score = curScores[upto] = scorer.score(); + bytesPerDoc += RamUsageEstimator.NUM_BYTES_FLOAT; } - upto++; - cachedScorer.doc = doc; - other.collect(doc); + maxDocsToCache = (int) ((maxRAMMB * 1024 * 1024) / bytesPerDoc); } + @Override + public boolean acceptsDocsOutOfOrder() { + return other.acceptsDocsOutOfOrder(); + } + public boolean isCached() { return curDocs != null; } @@ -223,26 +347,8 @@ lastReader = reader; } - @Override - public String toString() { - if (isCached()) { - return "CachingCollector (" + (base+upto) + " docs " + (cacheScores ? " & scores" : "") + " cached)"; - } else { - return "CachingCollector (cache was cleared)"; - } - } - - /** - * Replays the cached doc IDs (and scores) to the given Collector. - * - * @throws IllegalStateException - * if this collector is not cached (i.e., if the RAM limits were too - * low for the number of documents + scores to cache). - * @throws IllegalArgumentException - * if the given Collect's does not support out-of-order collection, - * while the collector passed to the ctor does. - */ - public void replay(Collector other) throws IOException { + /** Reused by the specialized inner classes. */ + void replayInit(Collector other) { if (!isCached()) { throw new IllegalStateException("cannot replay: cache was cleared because too much RAM was required"); } @@ -259,29 +365,20 @@ cachedSegs.add(new SegStart(lastReader, lastDocBase, base+upto)); lastReader = null; } - - int curupto = 0; - int curbase = 0; - int chunkUpto = 0; - other.setScorer(cachedScorer); - curDocs = EMPTY_INT_ARRAY; - for (SegStart seg : cachedSegs) { - other.setNextReader(seg.reader, seg.base); - while (curbase + curupto < seg.end) { - if (curupto == curDocs.length) { - curbase += curDocs.length; - curDocs = cachedDocs.get(chunkUpto); - if (cacheScores) { - curScores = cachedScores.get(chunkUpto); - } - chunkUpto++; - curupto = 0; - } - if (cacheScores) { - cachedScorer.score = curScores[curupto]; - } - other.collect(curDocs[curupto++]); - } - } } + + /** + * Replays the cached doc IDs (and scores) to the given Collector. If this + * instance does not cache scores, then Scorer is not set on + * {@code other.setScorer} as well as scores are not replayed. + * + * @throws IllegalStateException + * if this collector is not cached (i.e., if the RAM limits were too + * low for the number of documents + scores to cache). + * @throws IllegalArgumentException + * if the given Collect's does not support out-of-order collection, + * while the collector passed to the ctor does. + */ + public abstract void replay(Collector other) throws IOException; + }