Index: lucene/CHANGES.txt
===================================================================
--- lucene/CHANGES.txt (revision 1475961)
+++ lucene/CHANGES.txt (working copy)
@@ -46,6 +46,10 @@
if you had a 64-bit JVM without compressed OOPS: IBM J9, or Oracle with
large heap/explicitly disabled. (Mike McCandless, Uwe Schindler, Robert Muir)
+* LUCENE-4953: Fixed ParallelCompositeReader to inform ReaderClosedListeners of
+ its synthetic subreaders. FieldCaches keyed on the atomic childs will purged
+ earlier and FC insanity prevented. (Mike McCandless, Uwe Schindler)
+
Optimizations
* LUCENE-4938: Don't use an unnecessarily large priority queue in IndexSearcher
Index: lucene/core/src/java/org/apache/lucene/index/ParallelAtomicReader.java
===================================================================
--- lucene/core/src/java/org/apache/lucene/index/ParallelAtomicReader.java (revision 1475961)
+++ lucene/core/src/java/org/apache/lucene/index/ParallelAtomicReader.java (working copy)
@@ -47,7 +47,7 @@
* same order to the other indexes. Failure to do so will result in
* undefined behavior.
*/
-public final class ParallelAtomicReader extends AtomicReader {
+public class ParallelAtomicReader extends AtomicReader {
private final FieldInfos fieldInfos;
private final ParallelFields fields = new ParallelFields();
private final AtomicReader[] parallelReaders, storedFieldsReaders;
Index: lucene/core/src/java/org/apache/lucene/index/ParallelCompositeReader.java
===================================================================
--- lucene/core/src/java/org/apache/lucene/index/ParallelCompositeReader.java (revision 1475961)
+++ lucene/core/src/java/org/apache/lucene/index/ParallelCompositeReader.java (working copy)
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.util.Collections;
import java.util.IdentityHashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -47,10 +46,10 @@
* by number of documents per segment. If you use different {@link MergePolicy}s
* it might happen that the segment structure of your index is no longer predictable.
*/
-public final class ParallelCompositeReader extends BaseCompositeReader {
+public class ParallelCompositeReader extends BaseCompositeReader {
private final boolean closeSubReaders;
- private final Set completeReaderSet =
- Collections.newSetFromMap(new IdentityHashMap());
+ private final Set completeReaderSet =
+ Collections.newSetFromMap(new IdentityHashMap());
/** Create a ParallelCompositeReader based on the provided
* readers; auto-closes the given readers on {@link #close()}. */
@@ -72,12 +71,14 @@
this.closeSubReaders = closeSubReaders;
Collections.addAll(completeReaderSet, readers);
Collections.addAll(completeReaderSet, storedFieldReaders);
- // do this finally so any Exceptions occurred before don't affect refcounts:
+ // update ref-counts (like MultiReader):
if (!closeSubReaders) {
- for (CompositeReader reader : completeReaderSet) {
+ for (final IndexReader reader : completeReaderSet) {
reader.incRef();
}
}
+ // finally add our own synthetic readers, so we close or decRef them, too (it does not matter what we do)
+ completeReaderSet.addAll(getSequentialSubReaders());
}
private static IndexReader[] prepareSubReaders(CompositeReader[] readers, CompositeReader[] storedFieldsReaders) throws IOException {
@@ -112,10 +113,11 @@
for (int j = 0; j < storedFieldsReaders.length; j++) {
storedSubs[j] = (AtomicReader) storedFieldsReaders[j].getSequentialSubReaders().get(i);
}
- // we simply enable closing of subReaders, to prevent incRefs on subReaders
- // -> for synthetic subReaders, close() is never
- // called by our doClose()
- subReaders[i] = new ParallelAtomicReader(true, atomicSubs, storedSubs);
+ // We pass true for closeSubs and we prevent closing of subreaders in doClose():
+ // By this the synthetic throw-away readers used here are completely invisible to ref-counting
+ subReaders[i] = new ParallelAtomicReader(true, atomicSubs, storedSubs) {
+ @Override protected void doClose() {}
+ };
} else {
assert firstSubReaders.get(i) instanceof CompositeReader;
final CompositeReader[] compositeSubs = new CompositeReader[readers.length];
@@ -126,9 +128,11 @@
for (int j = 0; j < storedFieldsReaders.length; j++) {
storedSubs[j] = (CompositeReader) storedFieldsReaders[j].getSequentialSubReaders().get(i);
}
- // we simply enable closing of subReaders, to prevent incRefs on subReaders
- // -> for synthetic subReaders, close() is never called by our doClose()
- subReaders[i] = new ParallelCompositeReader(true, compositeSubs, storedSubs);
+ // We pass true for closeSubs and we prevent closing of subreaders in doClose():
+ // By this the synthetic throw-away readers used here are completely invisible to ref-counting
+ subReaders[i] = new ParallelCompositeReader(true, compositeSubs, storedSubs) {
+ @Override protected void doClose() {}
+ };
}
}
return subReaders;
@@ -159,19 +163,9 @@
}
@Override
- public String toString() {
- final StringBuilder buffer = new StringBuilder("ParallelCompositeReader(");
- for (final Iterator iter = completeReaderSet.iterator(); iter.hasNext();) {
- buffer.append(iter.next());
- if (iter.hasNext()) buffer.append(", ");
- }
- return buffer.append(')').toString();
- }
-
- @Override
protected synchronized void doClose() throws IOException {
IOException ioe = null;
- for (final CompositeReader reader : completeReaderSet) {
+ for (final IndexReader reader : completeReaderSet) {
try {
if (closeSubReaders) {
reader.close();
Index: lucene/core/src/test/org/apache/lucene/index/TestParallelAtomicReader.java
===================================================================
--- lucene/core/src/test/org/apache/lucene/index/TestParallelAtomicReader.java (revision 1475961)
+++ lucene/core/src/test/org/apache/lucene/index/TestParallelAtomicReader.java (working copy)
@@ -25,6 +25,7 @@
import org.apache.lucene.document.Field;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.*;
+import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util._TestUtil;
@@ -114,6 +115,29 @@
dir2.close();
}
+ public void testCloseInnerReader() throws Exception {
+ Directory dir1 = getDir1(random());
+ AtomicReader ir1 = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1));
+
+ // with overlapping
+ ParallelAtomicReader pr = new ParallelAtomicReader(true,
+ new AtomicReader[] {ir1},
+ new AtomicReader[] {ir1});
+
+ ir1.close();
+
+ try {
+ pr.document(0);
+ fail("ParallelAtomicReader should be already closed because inner reader was closed!");
+ } catch (AlreadyClosedException e) {
+ // pass
+ }
+
+ // noop:
+ pr.close();
+ dir1.close();
+ }
+
public void testIncompatibleIndexes() throws IOException {
// two documents:
Directory dir1 = getDir1(random());
Index: lucene/core/src/test/org/apache/lucene/index/TestParallelCompositeReader.java
===================================================================
--- lucene/core/src/test/org/apache/lucene/index/TestParallelCompositeReader.java (revision 1475961)
+++ lucene/core/src/test/org/apache/lucene/index/TestParallelCompositeReader.java (working copy)
@@ -23,8 +23,10 @@
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
+import org.apache.lucene.index.IndexReader.ReaderClosedListener;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.*;
+import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase;
@@ -277,6 +279,94 @@
dir2.close();
}
+ // closeSubreaders=false
+ public void testReaderClosedListener1() throws Exception {
+ Directory dir1 = getDir1(random());
+ CompositeReader ir1 = DirectoryReader.open(dir1);
+
+ // with overlapping
+ ParallelCompositeReader pr = new ParallelCompositeReader(false,
+ new CompositeReader[] {ir1},
+ new CompositeReader[] {ir1});
+
+ final int[] listenerClosedCount = new int[1];
+
+ assertEquals(3, pr.leaves().size());
+
+ for(AtomicReaderContext cxt : pr.leaves()) {
+ cxt.reader().addReaderClosedListener(new ReaderClosedListener() {
+ @Override
+ public void onClose(IndexReader reader) {
+ listenerClosedCount[0]++;
+ }
+ });
+ }
+ pr.close();
+ ir1.close();
+ assertEquals(3, listenerClosedCount[0]);
+ dir1.close();
+ }
+
+ // closeSubreaders=true
+ public void testReaderClosedListener2() throws Exception {
+ Directory dir1 = getDir1(random());
+ CompositeReader ir1 = DirectoryReader.open(dir1);
+
+ // with overlapping
+ ParallelCompositeReader pr = new ParallelCompositeReader(true,
+ new CompositeReader[] {ir1},
+ new CompositeReader[] {ir1});
+
+ final int[] listenerClosedCount = new int[1];
+
+ assertEquals(3, pr.leaves().size());
+
+ for(AtomicReaderContext cxt : pr.leaves()) {
+ cxt.reader().addReaderClosedListener(new ReaderClosedListener() {
+ @Override
+ public void onClose(IndexReader reader) {
+ listenerClosedCount[0]++;
+ }
+ });
+ }
+ pr.close();
+ assertEquals(3, listenerClosedCount[0]);
+ dir1.close();
+ }
+
+ public void testCloseInnerReader() throws Exception {
+ Directory dir1 = getDir1(random());
+ CompositeReader ir1 = DirectoryReader.open(dir1);
+ assertEquals(1, ir1.getSequentialSubReaders().get(0).getRefCount());
+
+ // with overlapping
+ ParallelCompositeReader pr = new ParallelCompositeReader(true,
+ new CompositeReader[] {ir1},
+ new CompositeReader[] {ir1});
+
+ ir1.close();
+
+ IndexReader sub = pr.getSequentialSubReaders().get(0);
+ assertEquals(1, sub.getRefCount());
+ try {
+ sub.document(0);
+ fail("Subreader should be already closed because inner reader was closed!");
+ } catch (AlreadyClosedException e) {
+ // pass
+ }
+
+ try {
+ pr.document(0);
+ fail("ParallelCompositeReader should be already closed because inner reader was closed!");
+ } catch (AlreadyClosedException e) {
+ // pass
+ }
+
+ // noop:
+ pr.close();
+ dir1.close();
+ }
+
private void queryTest(Query query) throws IOException {
ScoreDoc[] parallelHits = parallel.search(query, null, 1000).scoreDocs;
ScoreDoc[] singleHits = single.search(query, null, 1000).scoreDocs;