Index: lucene/CHANGES.txt =================================================================== --- lucene/CHANGES.txt (revision 1475961) +++ lucene/CHANGES.txt (working copy) @@ -46,6 +46,10 @@ if you had a 64-bit JVM without compressed OOPS: IBM J9, or Oracle with large heap/explicitly disabled. (Mike McCandless, Uwe Schindler, Robert Muir) +* LUCENE-4953: Fixed ParallelCompositeReader to inform ReaderClosedListeners of + its synthetic subreaders. FieldCaches keyed on the atomic childs will purged + earlier and FC insanity prevented. (Mike McCandless, Uwe Schindler) + Optimizations * LUCENE-4938: Don't use an unnecessarily large priority queue in IndexSearcher Index: lucene/core/src/java/org/apache/lucene/index/ParallelAtomicReader.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/ParallelAtomicReader.java (revision 1475961) +++ lucene/core/src/java/org/apache/lucene/index/ParallelAtomicReader.java (working copy) @@ -47,7 +47,7 @@ * same order to the other indexes. Failure to do so will result in * undefined behavior. */ -public final class ParallelAtomicReader extends AtomicReader { +public class ParallelAtomicReader extends AtomicReader { private final FieldInfos fieldInfos; private final ParallelFields fields = new ParallelFields(); private final AtomicReader[] parallelReaders, storedFieldsReaders; Index: lucene/core/src/java/org/apache/lucene/index/ParallelCompositeReader.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/ParallelCompositeReader.java (revision 1475961) +++ lucene/core/src/java/org/apache/lucene/index/ParallelCompositeReader.java (working copy) @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.Collections; import java.util.IdentityHashMap; -import java.util.Iterator; import java.util.List; import java.util.Set; @@ -47,10 +46,10 @@ * by number of documents per segment. If you use different {@link MergePolicy}s * it might happen that the segment structure of your index is no longer predictable. */ -public final class ParallelCompositeReader extends BaseCompositeReader { +public class ParallelCompositeReader extends BaseCompositeReader { private final boolean closeSubReaders; - private final Set completeReaderSet = - Collections.newSetFromMap(new IdentityHashMap()); + private final Set completeReaderSet = + Collections.newSetFromMap(new IdentityHashMap()); /** Create a ParallelCompositeReader based on the provided * readers; auto-closes the given readers on {@link #close()}. */ @@ -72,12 +71,14 @@ this.closeSubReaders = closeSubReaders; Collections.addAll(completeReaderSet, readers); Collections.addAll(completeReaderSet, storedFieldReaders); - // do this finally so any Exceptions occurred before don't affect refcounts: + // update ref-counts (like MultiReader): if (!closeSubReaders) { - for (CompositeReader reader : completeReaderSet) { + for (final IndexReader reader : completeReaderSet) { reader.incRef(); } } + // finally add our own synthetic readers, so we close or decRef them, too (it does not matter what we do) + completeReaderSet.addAll(getSequentialSubReaders()); } private static IndexReader[] prepareSubReaders(CompositeReader[] readers, CompositeReader[] storedFieldsReaders) throws IOException { @@ -112,10 +113,11 @@ for (int j = 0; j < storedFieldsReaders.length; j++) { storedSubs[j] = (AtomicReader) storedFieldsReaders[j].getSequentialSubReaders().get(i); } - // we simply enable closing of subReaders, to prevent incRefs on subReaders - // -> for synthetic subReaders, close() is never - // called by our doClose() - subReaders[i] = new ParallelAtomicReader(true, atomicSubs, storedSubs); + // We pass true for closeSubs and we prevent closing of subreaders in doClose(): + // By this the synthetic throw-away readers used here are completely invisible to ref-counting + subReaders[i] = new ParallelAtomicReader(true, atomicSubs, storedSubs) { + @Override protected void doClose() {} + }; } else { assert firstSubReaders.get(i) instanceof CompositeReader; final CompositeReader[] compositeSubs = new CompositeReader[readers.length]; @@ -126,9 +128,11 @@ for (int j = 0; j < storedFieldsReaders.length; j++) { storedSubs[j] = (CompositeReader) storedFieldsReaders[j].getSequentialSubReaders().get(i); } - // we simply enable closing of subReaders, to prevent incRefs on subReaders - // -> for synthetic subReaders, close() is never called by our doClose() - subReaders[i] = new ParallelCompositeReader(true, compositeSubs, storedSubs); + // We pass true for closeSubs and we prevent closing of subreaders in doClose(): + // By this the synthetic throw-away readers used here are completely invisible to ref-counting + subReaders[i] = new ParallelCompositeReader(true, compositeSubs, storedSubs) { + @Override protected void doClose() {} + }; } } return subReaders; @@ -159,19 +163,9 @@ } @Override - public String toString() { - final StringBuilder buffer = new StringBuilder("ParallelCompositeReader("); - for (final Iterator iter = completeReaderSet.iterator(); iter.hasNext();) { - buffer.append(iter.next()); - if (iter.hasNext()) buffer.append(", "); - } - return buffer.append(')').toString(); - } - - @Override protected synchronized void doClose() throws IOException { IOException ioe = null; - for (final CompositeReader reader : completeReaderSet) { + for (final IndexReader reader : completeReaderSet) { try { if (closeSubReaders) { reader.close(); Index: lucene/core/src/test/org/apache/lucene/index/TestParallelAtomicReader.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestParallelAtomicReader.java (revision 1475961) +++ lucene/core/src/test/org/apache/lucene/index/TestParallelAtomicReader.java (working copy) @@ -25,6 +25,7 @@ import org.apache.lucene.document.Field; import org.apache.lucene.search.BooleanClause.Occur; import org.apache.lucene.search.*; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util._TestUtil; @@ -114,6 +115,29 @@ dir2.close(); } + public void testCloseInnerReader() throws Exception { + Directory dir1 = getDir1(random()); + AtomicReader ir1 = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1)); + + // with overlapping + ParallelAtomicReader pr = new ParallelAtomicReader(true, + new AtomicReader[] {ir1}, + new AtomicReader[] {ir1}); + + ir1.close(); + + try { + pr.document(0); + fail("ParallelAtomicReader should be already closed because inner reader was closed!"); + } catch (AlreadyClosedException e) { + // pass + } + + // noop: + pr.close(); + dir1.close(); + } + public void testIncompatibleIndexes() throws IOException { // two documents: Directory dir1 = getDir1(random()); Index: lucene/core/src/test/org/apache/lucene/index/TestParallelCompositeReader.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestParallelCompositeReader.java (revision 1475961) +++ lucene/core/src/test/org/apache/lucene/index/TestParallelCompositeReader.java (working copy) @@ -23,8 +23,10 @@ import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; +import org.apache.lucene.index.IndexReader.ReaderClosedListener; import org.apache.lucene.search.BooleanClause.Occur; import org.apache.lucene.search.*; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.util.LuceneTestCase; @@ -277,6 +279,94 @@ dir2.close(); } + // closeSubreaders=false + public void testReaderClosedListener1() throws Exception { + Directory dir1 = getDir1(random()); + CompositeReader ir1 = DirectoryReader.open(dir1); + + // with overlapping + ParallelCompositeReader pr = new ParallelCompositeReader(false, + new CompositeReader[] {ir1}, + new CompositeReader[] {ir1}); + + final int[] listenerClosedCount = new int[1]; + + assertEquals(3, pr.leaves().size()); + + for(AtomicReaderContext cxt : pr.leaves()) { + cxt.reader().addReaderClosedListener(new ReaderClosedListener() { + @Override + public void onClose(IndexReader reader) { + listenerClosedCount[0]++; + } + }); + } + pr.close(); + ir1.close(); + assertEquals(3, listenerClosedCount[0]); + dir1.close(); + } + + // closeSubreaders=true + public void testReaderClosedListener2() throws Exception { + Directory dir1 = getDir1(random()); + CompositeReader ir1 = DirectoryReader.open(dir1); + + // with overlapping + ParallelCompositeReader pr = new ParallelCompositeReader(true, + new CompositeReader[] {ir1}, + new CompositeReader[] {ir1}); + + final int[] listenerClosedCount = new int[1]; + + assertEquals(3, pr.leaves().size()); + + for(AtomicReaderContext cxt : pr.leaves()) { + cxt.reader().addReaderClosedListener(new ReaderClosedListener() { + @Override + public void onClose(IndexReader reader) { + listenerClosedCount[0]++; + } + }); + } + pr.close(); + assertEquals(3, listenerClosedCount[0]); + dir1.close(); + } + + public void testCloseInnerReader() throws Exception { + Directory dir1 = getDir1(random()); + CompositeReader ir1 = DirectoryReader.open(dir1); + assertEquals(1, ir1.getSequentialSubReaders().get(0).getRefCount()); + + // with overlapping + ParallelCompositeReader pr = new ParallelCompositeReader(true, + new CompositeReader[] {ir1}, + new CompositeReader[] {ir1}); + + ir1.close(); + + IndexReader sub = pr.getSequentialSubReaders().get(0); + assertEquals(1, sub.getRefCount()); + try { + sub.document(0); + fail("Subreader should be already closed because inner reader was closed!"); + } catch (AlreadyClosedException e) { + // pass + } + + try { + pr.document(0); + fail("ParallelCompositeReader should be already closed because inner reader was closed!"); + } catch (AlreadyClosedException e) { + // pass + } + + // noop: + pr.close(); + dir1.close(); + } + private void queryTest(Query query) throws IOException { ScoreDoc[] parallelHits = parallel.search(query, null, 1000).scoreDocs; ScoreDoc[] singleHits = single.search(query, null, 1000).scoreDocs;