Index: lucene/CHANGES.txt
===================================================================
--- lucene/CHANGES.txt (revision 1147803)
+++ lucene/CHANGES.txt (working copy)
@@ -489,6 +489,11 @@
MultiTermQuery now stores TermState per leaf reader during rewrite to re-
seek the term dictionary in TermQuery / TermWeight.
(Simon Willnauer, Mike McCandless, Robert Muir)
+
+* LUCENE-3292: IndexWriter no longer shares the same SegmentReader
+ instance for merging and NRT readers, which enables directory impls
+ to separately tune IO flags for each. (Varun Thacker, Simon
+ Willnauer, MikeMcCandless)
Bug fixes
Index: lucene/src/test/org/apache/lucene/TestExternalCodecs.java
===================================================================
--- lucene/src/test/org/apache/lucene/TestExternalCodecs.java (revision 1147803)
+++ lucene/src/test/org/apache/lucene/TestExternalCodecs.java (working copy)
@@ -92,10 +92,6 @@
@Override
public void close() {
}
-
- @Override
- public void loadTermsIndex(int indexDivisor) {
- }
}
static class RAMField extends Terms {
Index: lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java
===================================================================
--- lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java (revision 1147803)
+++ lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java (working copy)
@@ -90,6 +90,7 @@
for (int r = 0; r < 3; r++) {
final IndexWriter w = new IndexWriter(dir, newIndexWriterConfig(
TEST_VERSION_CURRENT, new MockAnalyzer(random)).setMaxBufferedDocs(2));
+ w.setInfoStream(VERBOSE ? System.out : null);
final int numUpdates = atLeast(20);
int numThreads = _TestUtil.nextInt(random, 2, 6);
IndexingThread[] threads = new IndexingThread[numThreads];
Index: lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
===================================================================
--- lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java (revision 1147803)
+++ lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java (working copy)
@@ -551,7 +551,9 @@
if (!success) {
// Must force the close else the writer can have
// open files which cause exc in MockRAMDir.close
-
+ if (VERBOSE) {
+ System.out.println("TEST: now rollback");
+ }
modifier.rollback();
}
Index: lucene/src/java/org/apache/lucene/index/SegmentReader.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/SegmentReader.java (revision 1147803)
+++ lucene/src/java/org/apache/lucene/index/SegmentReader.java (working copy)
@@ -31,7 +31,6 @@
import org.apache.lucene.document.FieldSelector;
import org.apache.lucene.index.FieldInfo.IndexOptions;
import org.apache.lucene.index.codecs.PerDocValues;
-import org.apache.lucene.store.BufferedIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
@@ -161,9 +160,6 @@
// NOTE: the bitvector is stored using the regular directory, not cfs
if (hasDeletions(si)) {
liveDocs = new BitVector(directory(), si.getDelFileName(), new IOContext(context, true));
- if (liveDocs.getVersion() < BitVector.VERSION_DGAPS_CLEARED) {
- liveDocs.invertAll();
- }
liveDocsRef = new AtomicInteger(1);
assert checkLiveCounts();
if (liveDocs.size() != si.docCount) {
@@ -637,15 +633,6 @@
}
}
- // NOTE: only called from IndexWriter when a near
- // real-time reader is opened, or applyDeletes is run,
- // sharing a segment that's still being merged. This
- // method is not thread safe, and relies on the
- // synchronization in IndexWriter
- void loadTermsIndex(int indexDivisor) throws IOException {
- core.fields.loadTermsIndex(indexDivisor);
- }
-
// for testing only
boolean normsClosed() {
if (singleNormStream != null) {
Index: lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java (revision 1147803)
+++ lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java (working copy)
@@ -155,11 +155,6 @@
return 0;
}
}
-
- @Override
- public boolean equals(Object other) {
- return sortSegInfoByDelGen == other;
- }
};
/** Resolves the buffered deleted Term/Query/docIDs, into
@@ -240,7 +235,7 @@
delCount += applyQueryDeletes(packet.queriesIterable(), reader);
segAllDeletes = reader.numDocs() == 0;
} finally {
- readerPool.release(reader);
+ readerPool.release(reader, IOContext.Context.READ);
}
anyNewDeletes |= delCount > 0;
@@ -282,7 +277,7 @@
delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader);
segAllDeletes = reader.numDocs() == 0;
} finally {
- readerPool.release(reader);
+ readerPool.release(reader, IOContext.Context.READ);
}
anyNewDeletes |= delCount > 0;
Index: lucene/src/java/org/apache/lucene/index/MergePolicy.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/MergePolicy.java (revision 1147803)
+++ lucene/src/java/org/apache/lucene/index/MergePolicy.java (working copy)
@@ -17,16 +17,17 @@
* limitations under the License.
*/
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MergeInfo;
+import org.apache.lucene.util.BitVector;
+import org.apache.lucene.util.SetOnce.AlreadySetException;
import org.apache.lucene.util.SetOnce;
-import org.apache.lucene.util.SetOnce.AlreadySetException;
-import java.io.IOException;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-
/**
*
Expert: a MergePolicy determines the sequence of
* primitive merge operations to be used for overall merge
@@ -75,7 +76,7 @@
int maxNumSegmentsOptimize; // used by IndexWriter
public long estimatedMergeBytes; // used by IndexWriter
List readers; // used by IndexWriter
- List readerClones; // used by IndexWriter
+ List readerLiveDocs; // used by IndexWriter
public final List segments;
public final int totalDocCount;
boolean aborted;
Index: lucene/src/java/org/apache/lucene/index/DirectoryReader.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DirectoryReader.java (revision 1147803)
+++ lucene/src/java/org/apache/lucene/index/DirectoryReader.java (working copy)
@@ -143,12 +143,12 @@
}
// Used by near real-time search
- DirectoryReader(IndexWriter writer, SegmentInfos infos, int termInfosIndexDivisor, CodecProvider codecs, boolean applyAllDeletes) throws IOException {
+ DirectoryReader(IndexWriter writer, SegmentInfos infos, CodecProvider codecs, boolean applyAllDeletes) throws IOException {
this.directory = writer.getDirectory();
this.readOnly = true;
this.applyAllDeletes = applyAllDeletes; // saved for reopen
- this.termInfosIndexDivisor = termInfosIndexDivisor;
+ this.termInfosIndexDivisor = writer.getConfig().getReaderTermsIndexDivisor();
if (codecs == null) {
this.codecs = CodecProvider.getDefault();
} else {
@@ -171,8 +171,7 @@
try {
final SegmentInfo info = infos.info(i);
assert info.dir == dir;
- final SegmentReader reader = writer.readerPool.getReadOnlyClone(info, true, termInfosIndexDivisor,
- IOContext.READ);
+ final SegmentReader reader = writer.readerPool.getReadOnlyClone(info, IOContext.READ);
if (reader.numDocs() > 0 || writer.getKeepFullyDeletedSegments()) {
reader.readerFinishedListeners = readerFinishedListeners;
readers.add(reader);
Index: lucene/src/java/org/apache/lucene/index/SegmentMerger.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/SegmentMerger.java (revision 1147803)
+++ lucene/src/java/org/apache/lucene/index/SegmentMerger.java (working copy)
@@ -54,7 +54,7 @@
private String segment;
private int termIndexInterval = IndexWriterConfig.DEFAULT_TERM_INDEX_INTERVAL;
- private List readers = new ArrayList();
+ private List readers = new ArrayList();
private final FieldInfos fieldInfos;
private int mergedDocs;
@@ -100,9 +100,23 @@
* @param reader
*/
final void add(IndexReader reader) {
- ReaderUtil.gatherSubReaders(readers, reader);
+ try {
+ new ReaderUtil.Gather(reader) {
+ @Override
+ protected void add(int base, IndexReader r) {
+ readers.add(new MergeState.IndexReaderAndLiveDocs(r, r.getLiveDocs()));
+ }
+ }.run();
+ } catch (IOException ioe) {
+ // won't happen
+ throw new RuntimeException(ioe);
+ }
}
+ final void add(SegmentReader reader, Bits liveDocs) {
+ readers.add(new MergeState.IndexReaderAndLiveDocs(reader, liveDocs));
+ }
+
/**
* Merges the readers specified by the {@link #add} method into the directory passed to the constructor
* @return The number of documents that were merged
@@ -122,8 +136,9 @@
mergePerDoc();
mergeNorms();
- if (fieldInfos.hasVectors())
+ if (fieldInfos.hasVectors()) {
mergeVectors();
+ }
return mergedDocs;
}
@@ -188,9 +203,9 @@
// FieldInfos, then we can do a bulk copy of the
// stored fields:
for (int i = 0; i < numReaders; i++) {
- IndexReader reader = readers.get(i);
- if (reader instanceof SegmentReader) {
- SegmentReader segmentReader = (SegmentReader) reader;
+ MergeState.IndexReaderAndLiveDocs reader = readers.get(i);
+ if (reader.reader instanceof SegmentReader) {
+ SegmentReader segmentReader = (SegmentReader) reader.reader;
boolean same = true;
FieldInfos segmentFieldInfos = segmentReader.fieldInfos();
for (FieldInfo fi : segmentFieldInfos) {
@@ -215,7 +230,8 @@
* @throws IOException if there is a low-level IO error
*/
private int mergeFields() throws CorruptIndexException, IOException {
- for (IndexReader reader : readers) {
+ for (MergeState.IndexReaderAndLiveDocs readerAndLiveDocs : readers) {
+ final IndexReader reader = readerAndLiveDocs.reader;
if (reader instanceof SegmentReader) {
SegmentReader segmentReader = (SegmentReader) reader;
FieldInfos readerFieldInfos = segmentReader.fieldInfos();
@@ -244,7 +260,7 @@
final FieldsWriter fieldsWriter = new FieldsWriter(directory, segment, context);
try {
int idx = 0;
- for (IndexReader reader : readers) {
+ for (MergeState.IndexReaderAndLiveDocs reader : readers) {
final SegmentReader matchingSegmentReader = matchingSegmentReaders[idx++];
FieldsReader matchingFieldsReader = null;
if (matchingSegmentReader != null) {
@@ -253,7 +269,7 @@
matchingFieldsReader = fieldsReader;
}
}
- if (reader.hasDeletions()) {
+ if (reader.liveDocs != null) {
docCount += copyFieldsWithDeletions(fieldsWriter,
reader, matchingFieldsReader);
} else {
@@ -280,12 +296,12 @@
return docCount;
}
- private int copyFieldsWithDeletions(final FieldsWriter fieldsWriter, final IndexReader reader,
+ private int copyFieldsWithDeletions(final FieldsWriter fieldsWriter, final MergeState.IndexReaderAndLiveDocs reader,
final FieldsReader matchingFieldsReader)
throws IOException, MergeAbortedException, CorruptIndexException {
int docCount = 0;
- final int maxDoc = reader.maxDoc();
- final Bits liveDocs = reader.getLiveDocs();
+ final int maxDoc = reader.reader.maxDoc();
+ final Bits liveDocs = reader.liveDocs;
assert liveDocs != null;
if (matchingFieldsReader != null) {
// We can bulk-copy because the fieldInfos are "congruent"
@@ -321,7 +337,7 @@
}
// NOTE: it's very important to first assign to doc then pass it to
// termVectorsWriter.addAllDocVectors; see LUCENE-1282
- Document doc = reader.document(j);
+ Document doc = reader.reader.document(j);
fieldsWriter.addDocument(doc, fieldInfos);
docCount++;
checkAbort.work(300);
@@ -330,10 +346,10 @@
return docCount;
}
- private int copyFieldsNoDeletions(final FieldsWriter fieldsWriter, final IndexReader reader,
+ private int copyFieldsNoDeletions(final FieldsWriter fieldsWriter, final MergeState.IndexReaderAndLiveDocs reader,
final FieldsReader matchingFieldsReader)
throws IOException, MergeAbortedException, CorruptIndexException {
- final int maxDoc = reader.maxDoc();
+ final int maxDoc = reader.reader.maxDoc();
int docCount = 0;
if (matchingFieldsReader != null) {
// We can bulk-copy because the fieldInfos are "congruent"
@@ -348,7 +364,7 @@
for (; docCount < maxDoc; docCount++) {
// NOTE: it's very important to first assign to doc then pass it to
// termVectorsWriter.addAllDocVectors; see LUCENE-1282
- Document doc = reader.document(docCount);
+ Document doc = reader.reader.document(docCount);
fieldsWriter.addDocument(doc, fieldInfos);
checkAbort.work(300);
}
@@ -361,12 +377,11 @@
* @throws IOException
*/
private final void mergeVectors() throws IOException {
- TermVectorsWriter termVectorsWriter =
- new TermVectorsWriter(directory, segment, fieldInfos, context);
+ TermVectorsWriter termVectorsWriter = new TermVectorsWriter(directory, segment, fieldInfos, context);
try {
int idx = 0;
- for (final IndexReader reader : readers) {
+ for (final MergeState.IndexReaderAndLiveDocs reader : readers) {
final SegmentReader matchingSegmentReader = matchingSegmentReaders[idx++];
TermVectorsReader matchingVectorsReader = null;
if (matchingSegmentReader != null) {
@@ -377,11 +392,10 @@
matchingVectorsReader = vectorsReader;
}
}
- if (reader.hasDeletions()) {
+ if (reader.liveDocs != null) {
copyVectorsWithDeletions(termVectorsWriter, matchingVectorsReader, reader);
} else {
copyVectorsNoDeletions(termVectorsWriter, matchingVectorsReader, reader);
-
}
}
} finally {
@@ -402,10 +416,10 @@
private void copyVectorsWithDeletions(final TermVectorsWriter termVectorsWriter,
final TermVectorsReader matchingVectorsReader,
- final IndexReader reader)
+ final MergeState.IndexReaderAndLiveDocs reader)
throws IOException, MergeAbortedException {
- final int maxDoc = reader.maxDoc();
- final Bits liveDocs = reader.getLiveDocs();
+ final int maxDoc = reader.reader.maxDoc();
+ final Bits liveDocs = reader.liveDocs;
if (matchingVectorsReader != null) {
// We can bulk-copy because the fieldInfos are "congruent"
for (int docNum = 0; docNum < maxDoc;) {
@@ -440,7 +454,7 @@
// NOTE: it's very important to first assign to vectors then pass it to
// termVectorsWriter.addAllDocVectors; see LUCENE-1282
- TermFreqVector[] vectors = reader.getTermFreqVectors(docNum);
+ TermFreqVector[] vectors = reader.reader.getTermFreqVectors(docNum);
termVectorsWriter.addAllDocVectors(vectors);
checkAbort.work(300);
}
@@ -449,9 +463,9 @@
private void copyVectorsNoDeletions(final TermVectorsWriter termVectorsWriter,
final TermVectorsReader matchingVectorsReader,
- final IndexReader reader)
+ final MergeState.IndexReaderAndLiveDocs reader)
throws IOException, MergeAbortedException {
- final int maxDoc = reader.maxDoc();
+ final int maxDoc = reader.reader.maxDoc();
if (matchingVectorsReader != null) {
// We can bulk-copy because the fieldInfos are "congruent"
int docCount = 0;
@@ -466,7 +480,7 @@
for (int docNum = 0; docNum < maxDoc; docNum++) {
// NOTE: it's very important to first assign to vectors then pass it to
// termVectorsWriter.addAllDocVectors; see LUCENE-1282
- TermFreqVector[] vectors = reader.getTermFreqVectors(docNum);
+ TermFreqVector[] vectors = reader.reader.getTermFreqVectors(docNum);
termVectorsWriter.addAllDocVectors(vectors);
checkAbort.work(300);
}
@@ -487,23 +501,17 @@
final List fields = new ArrayList();
final List slices = new ArrayList();
- final List bits = new ArrayList();
- final List bitsStarts = new ArrayList();
- for(IndexReader r : readers) {
- final Fields f = r.fields();
- final int maxDoc = r.maxDoc();
+ for(MergeState.IndexReaderAndLiveDocs r : readers) {
+ final Fields f = r.reader.fields();
+ final int maxDoc = r.reader.maxDoc();
if (f != null) {
slices.add(new ReaderUtil.Slice(docBase, maxDoc, fields.size()));
fields.add(f);
- bits.add(r.getLiveDocs());
- bitsStarts.add(docBase);
}
docBase += maxDoc;
}
- bitsStarts.add(docBase);
-
// we may gather more readers than mergeState.readerCount
mergeState = new MergeState();
mergeState.readers = readers;
@@ -524,31 +532,32 @@
for(int i=0;i it = codecs.values().iterator();
- while (it.hasNext()) {
- it.next().loadTermsIndex(indexDivisor);
- }
- }
}
@Override
Index: lucene/src/java/org/apache/lucene/index/IndexWriter.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/IndexWriter.java (revision 1147803)
+++ lucene/src/java/org/apache/lucene/index/IndexWriter.java (working copy)
@@ -53,7 +53,6 @@
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.MergeInfo;
import org.apache.lucene.util.BitVector;
-import org.apache.lucene.util.Bits;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.StringHelper;
@@ -378,7 +377,7 @@
// just like we do when loading segments_N
synchronized(this) {
maybeApplyDeletes(applyAllDeletes);
- r = new DirectoryReader(this, segmentInfos, config.getReaderTermsIndexDivisor(), codecs, applyAllDeletes);
+ r = new DirectoryReader(this, segmentInfos, codecs, applyAllDeletes);
if (infoStream != null) {
message("return reader version=" + r.getVersion() + " reader=" + r);
}
@@ -416,18 +415,48 @@
* has been called on this instance). */
class ReaderPool {
+
+ final class SegmentCacheKey {
+ public final SegmentInfo si;
+ public final IOContext.Context context;
+
+ public SegmentCacheKey(SegmentInfo segInfo, IOContext.Context context) {
+ assert context == IOContext.Context.MERGE || context == IOContext.Context.READ;
+ this.si = segInfo;
+ this.context = context;
+ }
+
+ @Override
+ public int hashCode() {
+ return si.hashCode() + context.hashCode();
+ }
- private final Map readerMap = new HashMap();
+ @Override
+ public String toString() {
+ return "SegmentCacheKey(" + si + "," + context + ")";
+ }
+ @Override
+ public boolean equals(Object _other) {
+ if (!(_other instanceof SegmentCacheKey)) {
+ return false;
+ }
+ final SegmentCacheKey other = (SegmentCacheKey) _other;
+ return si.equals(other.si) && context == other.context;
+ }
+ }
+
+ private final Map readerMap = new HashMap();
+
/** Forcefully clear changes for the specified segments. This is called on successful merge. */
synchronized void clear(List infos) throws IOException {
if (infos == null) {
- for (Map.Entry ent: readerMap.entrySet()) {
+ for (Map.Entry ent: readerMap.entrySet()) {
ent.getValue().hasChanges = false;
}
} else {
for (final SegmentInfo info: infos) {
- final SegmentReader r = readerMap.get(info);
+ final SegmentReader r = readerMap.get(new SegmentCacheKey(info, IOContext.Context.MERGE));
if (r != null) {
r.hasChanges = false;
}
@@ -437,9 +466,13 @@
// used only by asserts
public synchronized boolean infoIsLive(SegmentInfo info) {
+ return infoIsLive(info, "");
+ }
+
+ public synchronized boolean infoIsLive(SegmentInfo info, String message) {
int idx = segmentInfos.indexOf(info);
- assert idx != -1: "info=" + info + " isn't in pool";
- assert segmentInfos.info(idx) == info: "info=" + info + " doesn't match live info in segmentInfos";
+ assert idx != -1: "info=" + info + " isn't live: " + message;
+ assert segmentInfos.info(idx) == info: "info=" + info + " doesn't match live info in segmentInfos: " + message;
return true;
}
@@ -460,8 +493,8 @@
* @param sr
* @throws IOException
*/
- public synchronized boolean release(SegmentReader sr) throws IOException {
- return release(sr, false);
+ public synchronized boolean release(SegmentReader sr, IOContext.Context context) throws IOException {
+ return release(sr, false, context);
}
/**
@@ -474,11 +507,33 @@
* @throws IOException
*/
public synchronized boolean release(SegmentReader sr, boolean drop) throws IOException {
+ final SegmentCacheKey cacheKey = new SegmentCacheKey(sr.getSegmentInfo(), IOContext.Context.READ);
+ final SegmentReader other = readerMap.get(cacheKey);
+ if (sr == other) {
+ return release(sr, drop, IOContext.Context.READ);
+ } else {
+ assert sr == readerMap.get(new SegmentCacheKey(sr.getSegmentInfo(), IOContext.Context.MERGE));
+ return release(sr, drop, IOContext.Context.MERGE);
+ }
+ }
- final boolean pooled = readerMap.containsKey(sr.getSegmentInfo());
+ /**
+ * Release the segment reader (i.e. decRef it and close if there
+ * are no more references.
+ * @return true if this release altered the index (eg
+ * the SegmentReader had pending changes to del docs and
+ * was closed). Caller must call checkpoint() if so.
+ * @param sr
+ * @throws IOException
+ */
+ public synchronized boolean release(SegmentReader sr, boolean drop, IOContext.Context context) throws IOException {
- assert !pooled || readerMap.get(sr.getSegmentInfo()) == sr;
+ SegmentCacheKey cacheKey = new SegmentCacheKey(sr.getSegmentInfo(), context);
+
+ final boolean pooled = readerMap.containsKey(cacheKey);
+ assert !pooled || readerMap.get(cacheKey) == sr;
+
// Drop caller's ref; for an external reader (not
// pooled), this decRef will close it
sr.decRef();
@@ -492,18 +547,34 @@
// Discard (don't save) changes when we are dropping
// the reader; this is used only on the sub-readers
// after a successful merge.
- sr.hasChanges &= !drop;
+ final boolean hasChanges;
+ if (drop) {
+ hasChanges = sr.hasChanges = false;
+ } else {
+ hasChanges = sr.hasChanges;
+ }
- final boolean hasChanges = sr.hasChanges;
-
// Drop our ref -- this will commit any pending
// changes to the dir
sr.close();
// We are the last ref to this reader; since we're
// not pooling readers, we release it:
- readerMap.remove(sr.getSegmentInfo());
+ readerMap.remove(cacheKey);
+ if (drop && context == IOContext.Context.MERGE) {
+ // Also drop the READ reader if present: we don't
+ // need its deletes since they've been carried
+ // over to the merged segment
+ cacheKey = new SegmentCacheKey(sr.getSegmentInfo(), IOContext.Context.READ);
+ SegmentReader sr2 = readerMap.get(cacheKey);
+ if (sr2 != null) {
+ readerMap.remove(cacheKey);
+ sr2.hasChanges = false;
+ sr2.close();
+ }
+ }
+
return hasChanges;
}
@@ -511,16 +582,26 @@
}
public synchronized void drop(List infos) throws IOException {
+ drop(infos, IOContext.Context.READ);
+ drop(infos, IOContext.Context.MERGE);
+ }
+
+ public synchronized void drop(List infos, IOContext.Context context) throws IOException {
for(SegmentInfo info : infos) {
- drop(info);
+ drop(info, context);
}
}
public synchronized void drop(SegmentInfo info) throws IOException {
- final SegmentReader sr = readerMap.get(info);
- if (sr != null) {
+ drop(info, IOContext.Context.READ);
+ drop(info, IOContext.Context.MERGE);
+ }
+
+ public synchronized void drop(SegmentInfo info, IOContext.Context context) throws IOException {
+ final SegmentReader sr;
+ if ((sr = readerMap.remove(new SegmentCacheKey(info, context))) != null) {
sr.hasChanges = false;
- readerMap.remove(info);
+ readerMap.remove(new SegmentCacheKey(info, context));
sr.close();
}
}
@@ -532,14 +613,14 @@
// sync'd on IW:
assert Thread.holdsLock(IndexWriter.this);
- Iterator> iter = readerMap.entrySet().iterator();
+ Iterator> iter = readerMap.entrySet().iterator();
while (iter.hasNext()) {
- Map.Entry ent = iter.next();
+ Map.Entry ent = iter.next();
SegmentReader sr = ent.getValue();
if (sr.hasChanges) {
- assert infoIsLive(sr.getSegmentInfo());
+ assert infoIsLive(sr.getSegmentInfo(), "key=" + ent.getKey());
sr.doCommit(null);
// Must checkpoint w/ deleter, because this
@@ -567,10 +648,9 @@
// We invoke deleter.checkpoint below, so we must be
// sync'd on IW:
assert Thread.holdsLock(IndexWriter.this);
-
+
for (SegmentInfo info : infos) {
-
- final SegmentReader sr = readerMap.get(info);
+ final SegmentReader sr = readerMap.get(new SegmentCacheKey(info, IOContext.Context.READ));
if (sr != null && sr.hasChanges) {
assert infoIsLive(info);
sr.doCommit(null);
@@ -582,13 +662,17 @@
}
}
+ public synchronized SegmentReader getReadOnlyClone(SegmentInfo info, IOContext context) throws IOException {
+ return getReadOnlyClone(info, true, context);
+ }
+
/**
* Returns a ref to a clone. NOTE: this clone is not
* enrolled in the pool, so you should simply close()
* it when you're done (ie, do not call release()).
*/
- public synchronized SegmentReader getReadOnlyClone(SegmentInfo info, boolean doOpenStores, int termInfosIndexDivisor, IOContext context) throws IOException {
- SegmentReader sr = get(info, doOpenStores, context, termInfosIndexDivisor);
+ public synchronized SegmentReader getReadOnlyClone(SegmentInfo info, boolean doOpenStores, IOContext context) throws IOException {
+ SegmentReader sr = get(info, doOpenStores, context);
try {
return (SegmentReader) sr.clone(true);
} finally {
@@ -596,62 +680,37 @@
}
}
- /**
- * Obtain a SegmentReader from the readerPool. The reader
- * must be returned by calling {@link #release(SegmentReader)}
- * @see #release(SegmentReader)
- * @param info
- * @param doOpenStores
- * @throws IOException
- */
- public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores, IOContext context) throws IOException {
- return get(info, doOpenStores, context, config.getReaderTermsIndexDivisor());
+ public synchronized SegmentReader get(SegmentInfo info, IOContext context) throws IOException {
+ return get(info, true, context);
}
/**
* Obtain a SegmentReader from the readerPool. The reader
* must be returned by calling {@link #release(SegmentReader)}
- *
* @see #release(SegmentReader)
* @param info
* @param doOpenStores
- * @param readBufferSize
- * @param termsIndexDivisor
* @throws IOException
*/
- public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores, IOContext context, int termsIndexDivisor) throws IOException {
+ public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores, IOContext context) throws IOException {
- // if (poolReaders) {
- // readBufferSize = BufferedIndexInput.BUFFER_SIZE;
- // }
-
- // TODO: context should be part of the key used to cache that reader in the pool.
-
- SegmentReader sr = readerMap.get(info);
+ SegmentCacheKey cacheKey = new SegmentCacheKey(info, context.context);
+ SegmentReader sr = readerMap.get(cacheKey);
if (sr == null) {
// TODO: we may want to avoid doing this while
// synchronized
// Returns a ref, which we xfer to readerMap:
- sr = SegmentReader.get(false, info.dir, info, doOpenStores, termsIndexDivisor, context);
+ sr = SegmentReader.get(false, info.dir, info, doOpenStores, context.context == IOContext.Context.MERGE ? -1 : config.getReaderTermsIndexDivisor(), context);
sr.readerFinishedListeners = readerFinishedListeners;
if (info.dir == directory) {
// Only pool if reader is not external
- readerMap.put(info, sr);
+ readerMap.put(cacheKey, sr);
}
} else {
if (doOpenStores) {
sr.openDocStores();
}
- if (termsIndexDivisor != -1) {
- // If this reader was originally opened because we
- // needed to merge it, we didn't load the terms
- // index. But now, if the caller wants the terms
- // index (eg because it's doing deletes, or an NRT
- // reader is being opened) we ask the reader to
- // load its terms index.
- sr.loadTermsIndex(termsIndexDivisor);
- }
}
// Return a ref to our caller
@@ -664,13 +723,23 @@
// Returns a ref
public synchronized SegmentReader getIfExists(SegmentInfo info) throws IOException {
- SegmentReader sr = readerMap.get(info);
+ SegmentReader sr = getIfExists(info, IOContext.Context.READ);
+ if (sr == null) {
+ sr = getIfExists(info, IOContext.Context.MERGE);
+ }
+ return sr;
+ }
+
+ // Returns a ref
+ public synchronized SegmentReader getIfExists(SegmentInfo info, IOContext.Context context) throws IOException {
+ SegmentCacheKey cacheKey = new SegmentCacheKey(info, context);
+ SegmentReader sr = readerMap.get(cacheKey);
if (sr != null) {
sr.incRef();
}
return sr;
}
- }
+ }
/**
* Obtain the number of deleted docs for a pooled reader.
@@ -687,7 +756,7 @@
}
} finally {
if (reader != null) {
- readerPool.release(reader);
+ readerPool.release(reader, false);
}
}
}
@@ -2853,7 +2922,7 @@
}
if (!keepFullyDeletedSegments && result.allDeleted != null) {
if (infoStream != null) {
- message("drop 100% deleted segments: " + result.allDeleted);
+ message("drop 100% deleted segments: " + segString(result.allDeleted));
}
for (SegmentInfo info : result.allDeleted) {
// If a merge has already registered for this
@@ -2929,17 +2998,28 @@
for(int i=0; i < sourceSegments.size(); i++) {
SegmentInfo info = sourceSegments.get(i);
minGen = Math.min(info.getBufferedDeletesGen(), minGen);
- int docCount = info.docCount;
- final SegmentReader previousReader = merge.readerClones.get(i);
- if (previousReader == null) {
- // Reader was skipped because it was 100% deletions
- continue;
+ final int docCount = info.docCount;
+ final BitVector prevLiveDocs = merge.readerLiveDocs.get(i);
+ final BitVector currentLiveDocs;
+ {
+ final SegmentReader currentReader = readerPool.getIfExists(info, IOContext.Context.READ);
+ if (currentReader != null) {
+ currentLiveDocs = (BitVector) currentReader.getLiveDocs();
+ readerPool.release(currentReader, false, IOContext.Context.READ);
+ } else {
+ assert readerPool.infoIsLive(info);
+ if (info.hasDeletions()) {
+ currentLiveDocs = new BitVector(directory,
+ info.getDelFileName(),
+ new IOContext(IOContext.Context.READ));
+ } else {
+ currentLiveDocs = null;
+ }
+ }
}
- final Bits prevLiveDocs = previousReader.getLiveDocs();
- final SegmentReader currentReader = merge.readers.get(i);
- final Bits currentLiveDocs = currentReader.getLiveDocs();
- if (previousReader.hasDeletions()) {
+ if (prevLiveDocs != null) {
+
// There were deletes on this segment when the merge
// started. The merge has collapsed away those
// deletes, but, if new deletes were flushed since
@@ -2947,14 +3027,14 @@
// newly flushed deletes but mapping them to the new
// docIDs.
- if (currentReader.numDeletedDocs() > previousReader.numDeletedDocs()) {
- // This means this segment has had new deletes
- // committed since we started the merge, so we
+ if (currentLiveDocs.count() < prevLiveDocs.count()) {
+ // This means this segment received new deletes
+ // since we started the merge, so we
// must merge them:
for(int j=0;j();
- merge.readerClones = new ArrayList();
+ merge.readerLiveDocs = new ArrayList();
+
// This is try/finally to make sure merger's readers are
// closed:
boolean success = false;
@@ -3453,20 +3523,34 @@
// Hold onto the "live" reader; we will use this to
// commit merged deletes
- final SegmentReader reader = readerPool.get(info, true,
- context,
- -config.getReaderTermsIndexDivisor());
+ final SegmentReader reader = readerPool.get(info, context);
+
+ // Carefully pull the most recent live docs:
+ final SegmentReader delReader = readerPool.getIfExists(info, IOContext.Context.READ);
+ final BitVector liveDocs;
+ if (delReader != null) {
+ BitVector readerLiveDocs = (BitVector) delReader.getLiveDocs();
+ readerPool.release(delReader, false, IOContext.Context.READ);
+ if (readerLiveDocs != null) {
+ // We clone the del docs because other
+ // deletes may come in while we're merging. We
+ // need frozen deletes while merging, and then
+ // we carry over any new deletions in
+ // commitMergedDeletes.
+ liveDocs = (BitVector) readerLiveDocs.clone();
+ } else {
+ liveDocs = null;
+ }
+ } else {
+ liveDocs = (BitVector) reader.getLiveDocs();
+ }
+
+ merge.readerLiveDocs.add(liveDocs);
merge.readers.add(reader);
- // We clone the segment readers because other
- // deletes may come in while we're merging so we
- // need readers that will not change
- final SegmentReader clone = (SegmentReader) reader.clone(true);
- merge.readerClones.add(clone);
-
- if (clone.numDocs() > 0) {
- merger.add(clone);
- totDocCount += clone.numDocs();
+ if (liveDocs == null || liveDocs.count() > 0) {
+ merger.add(reader, liveDocs);
+ totDocCount += liveDocs == null ? reader.maxDoc() : liveDocs.count();
}
segUpto++;
}
@@ -3562,25 +3646,24 @@
}
final IndexReaderWarmer mergedSegmentWarmer = config.getMergedSegmentWarmer();
- final int termsIndexDivisor;
+
+ // TODO: in the non-realtime case, we may want to only
+ // keep deletes (it's costly to open entire reader
+ // when we just need deletes)
+
final boolean loadDocStores;
-
if (mergedSegmentWarmer != null) {
// Load terms index & doc stores so the segment
// warmer can run searches, load documents/term
// vectors
- termsIndexDivisor = config.getReaderTermsIndexDivisor();
loadDocStores = true;
} else {
- termsIndexDivisor = -1;
loadDocStores = false;
}
- // TODO: in the non-realtime case, we may want to only
- // keep deletes (it's costly to open entire reader
- // when we just need deletes)
-
- final SegmentReader mergedReader = readerPool.get(merge.info, loadDocStores, context, termsIndexDivisor);
+ // Force READ context because we merge deletes onto
+ // this reader:
+ final SegmentReader mergedReader = readerPool.get(merge.info, loadDocStores, new IOContext(IOContext.Context.READ));
try {
if (poolReaders && mergedSegmentWarmer != null) {
mergedSegmentWarmer.warm(mergedReader);
@@ -3592,7 +3675,7 @@
}
} finally {
synchronized(this) {
- if (readerPool.release(mergedReader)) {
+ if (readerPool.release(mergedReader, IOContext.Context.READ)) {
// Must checkpoint after releasing the
// mergedReader since it may have written a new
// deletes file:
@@ -3667,7 +3750,7 @@
}
} finally {
if (reader != null) {
- readerPool.release(reader);
+ readerPool.release(reader, false);
}
}
return buffer.toString();
Index: lucene/src/java/org/apache/lucene/index/codecs/DocValuesConsumer.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/codecs/DocValuesConsumer.java (revision 1147803)
+++ lucene/src/java/org/apache/lucene/index/codecs/DocValuesConsumer.java (working copy)
@@ -103,21 +103,20 @@
// TODO we need some kind of compatibility notation for values such
// that two slightly different segments can be merged eg. fixed vs.
// variable byte len or float32 vs. float64
- int docBase = 0;
boolean merged = false;
/*
* We ignore the given DocValues here and merge from the subReaders directly
* to support bulk copies on the DocValues Writer level. if this gets merged
* with MultiDocValues the writer can not optimize for bulk-copyable data
*/
- for (final IndexReader reader : mergeState.readers) {
- final IndexDocValues r = reader.docValues(mergeState.fieldInfo.name);
+ for(int readerIDX=0;readerIDX 0;
+
in = dir.openInput(IndexFileNames.segmentFileName(segment, codecId, FixedGapTermsIndexWriter.TERMS_INDEX_EXTENSION), context);
boolean success = false;
@@ -251,7 +251,7 @@
}
}
- public void loadTermsIndex() throws IOException {
+ private void loadTermsIndex() throws IOException {
if (coreIndex == null) {
coreIndex = new CoreFieldIndex(indexStart, termsStart, packedIndexStart, packedOffsetsStart, numIndexTerms);
}
@@ -375,30 +375,7 @@
}
}
- // Externally synced in IndexWriter
@Override
- public void loadTermsIndex(int indexDivisor) throws IOException {
- if (!indexLoaded) {
-
- if (indexDivisor < 0) {
- this.indexDivisor = -indexDivisor;
- } else {
- this.indexDivisor = indexDivisor;
- }
- this.totalIndexInterval = indexInterval * this.indexDivisor;
-
- Iterator it = fields.values().iterator();
- while(it.hasNext()) {
- it.next().loadTermsIndex();
- }
-
- indexLoaded = true;
- in.close();
- termBytesReader = termBytes.freeze(true);
- }
- }
-
- @Override
public FieldIndexEnum getFieldEnum(FieldInfo fieldInfo) {
final FieldIndexData fieldData = fields.get(fieldInfo);
if (fieldData.coreIndex == null) {
Index: lucene/src/java/org/apache/lucene/index/codecs/simpletext/SimpleTextFieldsReader.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/codecs/simpletext/SimpleTextFieldsReader.java (revision 1147803)
+++ lucene/src/java/org/apache/lucene/index/codecs/simpletext/SimpleTextFieldsReader.java (working copy)
@@ -593,10 +593,6 @@
}
@Override
- public void loadTermsIndex(int indexDivisor) {
- }
-
- @Override
public void close() throws IOException {
in.close();
}
Index: lucene/src/java/org/apache/lucene/index/codecs/preflex/PreFlexFields.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/codecs/preflex/PreFlexFields.java (revision 1147803)
+++ lucene/src/java/org/apache/lucene/index/codecs/preflex/PreFlexFields.java (working copy)
@@ -171,30 +171,6 @@
}
@Override
- synchronized public void loadTermsIndex(int indexDivisor) throws IOException {
- if (tis == null) {
- Directory dir0;
- if (si.getUseCompoundFile()) {
- // In some cases, we were originally opened when CFS
- // was not used, but then we are asked to open the
- // terms reader with index, the segment has switched
- // to CFS
-
- if (!(dir instanceof CompoundFileDirectory)) {
- dir0 = cfsReader = dir.openCompoundInput(IndexFileNames.segmentFileName(si.name, "", IndexFileNames.COMPOUND_FILE_EXTENSION), context);
- } else {
- dir0 = dir;
- }
- dir0 = cfsReader;
- } else {
- dir0 = dir;
- }
-
- tis = new TermInfosReader(dir0, si.name, fieldInfos, context, indexDivisor);
- }
- }
-
- @Override
public void close() throws IOException {
if (tis != null) {
tis.close();
Index: lucene/src/java/org/apache/lucene/index/codecs/memory/MemoryCodec.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/codecs/memory/MemoryCodec.java (revision 1147803)
+++ lucene/src/java/org/apache/lucene/index/codecs/memory/MemoryCodec.java (working copy)
@@ -767,11 +767,6 @@
}
@Override
- public void loadTermsIndex(int indexDivisor) {
- // no op
- }
-
- @Override
public void close() {
// Drop ref to FST:
for(TermsReader termsReader : fields.values()) {
Index: lucene/src/java/org/apache/lucene/index/codecs/MergeState.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/codecs/MergeState.java (revision 1147803)
+++ lucene/src/java/org/apache/lucene/index/codecs/MergeState.java (working copy)
@@ -26,13 +26,25 @@
import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor;
import org.apache.lucene.index.PayloadProcessorProvider.PayloadProcessor;
import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.Bits;
/** Holds common state used during segment merging
*
* @lucene.experimental */
public class MergeState {
+
+ public static class IndexReaderAndLiveDocs {
+ public final IndexReader reader;
+ public final Bits liveDocs;
+
+ public IndexReaderAndLiveDocs(IndexReader reader, Bits liveDocs) {
+ this.reader = reader;
+ this.liveDocs = liveDocs;
+ }
+ }
+
public FieldInfos fieldInfos;
- public List readers; // Readers being merged
+ public List readers; // Readers & liveDocs being merged
public int readerCount; // Number of readers being merged
public int[][] docMaps; // Maps docIDs around deletions
public int[] docBase; // New docID base per reader
Index: lucene/src/java/org/apache/lucene/index/codecs/TermsIndexReaderBase.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/codecs/TermsIndexReaderBase.java (revision 1147803)
+++ lucene/src/java/org/apache/lucene/index/codecs/TermsIndexReaderBase.java (working copy)
@@ -43,8 +43,6 @@
public abstract FieldIndexEnum getFieldEnum(FieldInfo fieldInfo);
- public abstract void loadTermsIndex(int indexDivisor) throws IOException;
-
public abstract void close() throws IOException;
public abstract void getExtensions(Collection extensions);
Index: lucene/src/java/org/apache/lucene/index/codecs/VariableGapTermsIndexReader.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/codecs/VariableGapTermsIndexReader.java (revision 1147803)
+++ lucene/src/java/org/apache/lucene/index/codecs/VariableGapTermsIndexReader.java (working copy)
@@ -23,7 +23,6 @@
import java.io.Writer; // for toDot
import java.util.Collection;
import java.util.HashMap;
-import java.util.Iterator;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
@@ -63,6 +62,7 @@
in = dir.openInput(IndexFileNames.segmentFileName(segment, codecId, VariableGapTermsIndexWriter.TERMS_INDEX_EXTENSION), new IOContext(context, true));
this.segment = segment;
boolean success = false;
+ assert indexDivisor == -1 || indexDivisor > 0;
try {
@@ -170,7 +170,7 @@
}
}
- public void loadTermsIndex() throws IOException {
+ private void loadTermsIndex() throws IOException {
if (fst == null) {
IndexInput clone = (IndexInput) in.clone();
clone.seek(indexStart);
@@ -205,28 +205,7 @@
}
}
- // Externally synced in IndexWriter
@Override
- public void loadTermsIndex(int indexDivisor) throws IOException {
- if (!indexLoaded) {
-
- if (indexDivisor < 0) {
- this.indexDivisor = -indexDivisor;
- } else {
- this.indexDivisor = indexDivisor;
- }
-
- Iterator it = fields.values().iterator();
- while(it.hasNext()) {
- it.next().loadTermsIndex();
- }
-
- indexLoaded = true;
- in.close();
- }
- }
-
- @Override
public FieldIndexEnum getFieldEnum(FieldInfo fieldInfo) {
final FieldIndexData fieldData = fields.get(fieldInfo);
if (fieldData.fst == null) {
Index: lucene/src/java/org/apache/lucene/index/codecs/BlockTermsReader.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/codecs/BlockTermsReader.java (revision 1147803)
+++ lucene/src/java/org/apache/lucene/index/codecs/BlockTermsReader.java (working copy)
@@ -165,11 +165,6 @@
}
@Override
- public void loadTermsIndex(int indexDivisor) throws IOException {
- indexReader.loadTermsIndex(indexDivisor);
- }
-
- @Override
public void close() throws IOException {
try {
try {
Index: lucene/src/java/org/apache/lucene/util/BitVector.java
===================================================================
--- lucene/src/java/org/apache/lucene/util/BitVector.java (revision 1147803)
+++ lucene/src/java/org/apache/lucene/util/BitVector.java (working copy)
@@ -353,6 +353,11 @@
} else {
readBits(input);
}
+
+ if (version < VERSION_DGAPS_CLEARED) {
+ invertAll();
+ }
+
assert verifyCount();
} finally {
input.close();
Index: lucene/src/test-framework/org/apache/lucene/store/MockDirectoryWrapper.java
===================================================================
--- lucene/src/test-framework/org/apache/lucene/store/MockDirectoryWrapper.java (revision 1147803)
+++ lucene/src/test-framework/org/apache/lucene/store/MockDirectoryWrapper.java (working copy)
@@ -272,14 +272,18 @@
}
void maybeThrowIOException() throws IOException {
+ maybeThrowIOException(null);
+ }
+
+ void maybeThrowIOException(String message) throws IOException {
if (randomIOExceptionRate > 0.0) {
int number = Math.abs(randomState.nextInt() % 1000);
if (number < randomIOExceptionRate*1000) {
if (LuceneTestCase.VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": MockDirectoryWrapper: now throw random exception");
+ System.out.println(Thread.currentThread().getName() + ": MockDirectoryWrapper: now throw random exception" + (message == null ? "" : " (" + message + ")"));
new Throwable().printStackTrace(System.out);
}
- throw new IOException("a random IOException");
+ throw new IOException("a random IOException" + (message == null ? "" : "(" + message + ")"));
}
}
}
Index: lucene/src/test-framework/org/apache/lucene/store/MockIndexOutputWrapper.java
===================================================================
--- lucene/src/test-framework/org/apache/lucene/store/MockIndexOutputWrapper.java (revision 1147803)
+++ lucene/src/test-framework/org/apache/lucene/store/MockIndexOutputWrapper.java (working copy)
@@ -126,7 +126,7 @@
// Maybe throw random exception; only do this on first
// write to a new file:
first = false;
- dir.maybeThrowIOException();
+ dir.maybeThrowIOException(name);
}
}