Index: lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java (revision 1481938) +++ lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java (working copy) @@ -52,26 +52,26 @@ * segment. */ class BufferedDeletesStream { - + // TODO: maybe linked list? private final List deletes = new ArrayList(); - + // Starts at 1 so that SegmentInfos that have never had // deletes applied (whose bufferedDelGen defaults to 0) // will be correct: private long nextGen = 1; - + // used only by assert private Term lastDeleteTerm; - + private final InfoStream infoStream; private final AtomicLong bytesUsed = new AtomicLong(); private final AtomicInteger numTerms = new AtomicInteger(); - + public BufferedDeletesStream(InfoStream infoStream) { this.infoStream = infoStream; } - + // Appends a new packet of buffered deletes to the stream, // setting its generation: public synchronized long push(FrozenBufferedDeletes packet) { @@ -86,127 +86,169 @@ assert packet.anyDeletes() || packet.anyUpdates(); assert checkDeleteStats(); assert packet.delGen() < nextGen; - assert deletes.isEmpty() || deletes.get(deletes.size()-1).delGen() < packet.delGen() : "Delete packets must be in order"; + assert deletes.isEmpty() + || deletes.get(deletes.size() - 1).delGen() < packet.delGen() : "Delete packets must be in order"; deletes.add(packet); numTerms.addAndGet(packet.numTermDeletes); bytesUsed.addAndGet(packet.bytesUsed); if (infoStream.isEnabled("BD")) { - infoStream.message("BD", "push deletes " + packet + " delGen=" + packet.delGen() + " packetCount=" + deletes.size() + " totBytesUsed=" + bytesUsed.get()); + infoStream.message( + "BD", + "push deletes " + packet + " delGen=" + packet.delGen() + + " packetCount=" + deletes.size() + " totBytesUsed=" + + bytesUsed.get()); } assert checkDeleteStats(); return packet.delGen(); } - + public synchronized void clear() { deletes.clear(); nextGen = 1; numTerms.set(0); bytesUsed.set(0); } - + public boolean any() { return bytesUsed.get() != 0; } - + public int numTerms() { return numTerms.get(); } - + public long bytesUsed() { return bytesUsed.get(); } - + public static class ApplyDeletesResult { // True if any actual deletes took place: public final boolean anyDeletes; - + // Current gen, for the merged segment: public final long gen; - + // If non-null, contains segments that are 100% deleted public final List allDeleted; - - ApplyDeletesResult(boolean anyDeletes, long gen, List allDeleted) { + + ApplyDeletesResult(boolean anyDeletes, long gen, + List allDeleted) { this.anyDeletes = anyDeletes; this.gen = gen; this.allDeleted = allDeleted; } } - + // Sorts SegmentInfos from smallest to biggest bufferedDelGen: private static final Comparator sortSegInfoByDelGen = new Comparator() { @Override public int compare(SegmentInfoPerCommit si1, SegmentInfoPerCommit si2) { - return Long.compare(si1.getBufferedDeletesGen(), si2.getBufferedDeletesGen()); + return Long.compare(si1.getBufferedDeletesGen(), + si2.getBufferedDeletesGen()); } }; - /** Resolves the buffered deleted Term/Query/docIDs, into - * actual deleted docIDs in the liveDocs MutableBits for - * each SegmentReader. */ - public synchronized ApplyDeletesResult applyDeletes(IndexWriter.ReaderPool readerPool, List infos) throws IOException { + /** + * Resolves the buffered deleted Term/Query/docIDs, into actual deleted docIDs + * in the liveDocs MutableBits for each SegmentReader. + */ + public synchronized ApplyDeletesResult applyDeletes( + IndexWriter.ReaderPool readerPool, List infos) + throws IOException { final long t0 = System.currentTimeMillis(); - + if (infos.size() == 0) { return new ApplyDeletesResult(false, nextGen++, null); } - + assert checkDeleteStats(); - + if (!any()) { if (infoStream.isEnabled("BD")) { infoStream.message("BD", "applyDeletes: no deletes; skipping"); } return new ApplyDeletesResult(false, nextGen++, null); } - + if (infoStream.isEnabled("BD")) { - infoStream.message("BD", "applyDeletes: infos=" + infos + " packetCount=" + deletes.size()); + infoStream.message("BD", "applyDeletes: infos=" + infos + " packetCount=" + + deletes.size()); } - + final long gen = nextGen++; - - List infos2 = new ArrayList(); - infos2.addAll(infos); + + List infos2 = new ArrayList( + infos); Collections.sort(infos2, sortSegInfoByDelGen); - + + List allDeleted = new ArrayList(); + ; + Set advanced = new HashSet(); + + boolean anyNewDeletes = false; + // go through packets forward and apply deletes and updates + anyNewDeletes |= handleUpdates(readerPool, infos2, advanced); + // go through packets backwards and apply deletes + anyNewDeletes = handleDeletes(readerPool, infos2, allDeleted, advanced); + + // mark all advanced segment infos + for (SegmentInfoPerCommit info : advanced) { + info.setBufferedDeletesGen(gen); + } + + assert checkDeleteStats(); + if (infoStream.isEnabled("BD")) { + infoStream.message("BD", + "applyDeletes took " + (System.currentTimeMillis() - t0) + " msec"); + } + // assert infos != segmentInfos || !any() : "infos=" + infos + + // " segmentInfos=" + segmentInfos + " any=" + any; + + return new ApplyDeletesResult(anyNewDeletes, gen, allDeleted); + } + + private boolean handleDeletes(IndexWriter.ReaderPool readerPool, + List infos2, List allDeleted, + Set advanced) throws IOException { CoalescedDeletes coalescedDeletes = null; boolean anyNewDeletes = false; - - int infosIDX = infos2.size()-1; - int delIDX = deletes.size()-1; - - List allDeleted = null; - Set advanced = null; - + + int infosIDX = infos2.size() - 1; + int delIDX = deletes.size() - 1; + while (infosIDX >= 0) { - //System.out.println("BD: cycle delIDX=" + delIDX + " infoIDX=" + infosIDX); - - final FrozenBufferedDeletes packet = delIDX >= 0 ? deletes.get(delIDX) : null; + // System.out.println("BD: cycle delIDX=" + delIDX + " infoIDX=" + + // infosIDX); + + final FrozenBufferedDeletes packet = delIDX >= 0 ? deletes.get(delIDX) + : null; final SegmentInfoPerCommit info = infos2.get(infosIDX); final long segGen = info.getBufferedDeletesGen(); - + if (packet != null && packet.anyDeletes() && segGen < packet.delGen()) { - //System.out.println(" coalesce"); + // System.out.println(" coalesce"); if (coalescedDeletes == null) { coalescedDeletes = new CoalescedDeletes(); } if (!packet.isSegmentPrivate) { /* - * Only coalesce if we are NOT on a segment private del packet: the segment private del packet - * must only applied to segments with the same delGen. Yet, if a segment is already deleted - * from the SI since it had no more documents remaining after some del packets younger than - * its segPrivate packet (higher delGen) have been applied, the segPrivate packet has not been - * removed. + * Only coalesce if we are NOT on a segment private del packet: the + * segment private del packet must only applied to segments with the + * same delGen. Yet, if a segment is already deleted from the SI since + * it had no more documents remaining after some del packets younger + * than its segPrivate packet (higher delGen) have been applied, the + * segPrivate packet has not been removed. */ coalescedDeletes.update(packet); } - + delIDX--; - } else if (packet != null && packet.anyDeletes() && segGen == packet.delGen()) { - assert packet.isSegmentPrivate : "Packet and Segments deletegen can only match on a segment private del packet gen=" + segGen; - //System.out.println(" eq"); - + } else if (packet != null && packet.anyDeletes() + && segGen == packet.delGen()) { + assert packet.isSegmentPrivate : "Packet and Segments deletegen can only match on a segment private del packet gen=" + + segGen; + // System.out.println(" eq"); + // Lock order: IW -> BD -> RP assert readerPool.infoIsLive(info); final ReadersAndLiveDocs rld = readerPool.get(info, true); @@ -215,15 +257,18 @@ final boolean segAllDeletes; try { if (coalescedDeletes != null) { - //System.out.println(" del coalesced"); - delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader); - delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader); + // System.out.println(" del coalesced"); + delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, + reader); + delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), + rld, reader); } - //System.out.println(" del exact"); + // System.out.println(" del exact"); // Don't delete by Term here; DocumentsWriterPerThread // already did that on flush: delCount += applyQueryDeletes(packet.queriesIterable(), rld, reader); - final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount(); + final int fullDelCount = rld.info.getDelCount() + + rld.getPendingDeleteCount(); assert fullDelCount <= rld.info.info.getDocCount(); segAllDeletes = fullDelCount == rld.info.info.getDocCount(); } finally { @@ -231,40 +276,38 @@ readerPool.release(rld); } anyNewDeletes |= delCount > 0; - + if (segAllDeletes) { - if (allDeleted == null) { - allDeleted = new ArrayList(); - } allDeleted.add(info); } - + if (infoStream.isEnabled("BD")) { - infoStream.message("BD", "seg=" + info + " segGen=" + segGen + " segDeletes=[" + packet + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] newDelCount=" + delCount + (segAllDeletes ? " 100% deleted" : "")); + infoStream.message("BD", "seg=" + info + " segGen=" + segGen + + " segDeletes=[" + packet + "]; coalesced deletes=[" + + (coalescedDeletes == null ? "null" : coalescedDeletes) + + "] newDelCount=" + delCount + + (segAllDeletes ? " 100% deleted" : "")); } - + if (coalescedDeletes == null) { coalescedDeletes = new CoalescedDeletes(); } /* - * Since we are on a segment private del packet we must not - * update the coalescedDeletes here! We can simply advance to the - * next packet and seginfo. + * Since we are on a segment private del packet we must not update the + * coalescedDeletes here! We can simply advance to the next packet and + * seginfo. */ delIDX--; infosIDX--; - if (advanced == null) { - advanced = new HashSet(); - } advanced.add(info); - + } else if (packet != null && !packet.anyDeletes() && packet.anyUpdates()) { // ignore updates only packets delIDX--; } else { - //System.out.println(" gt"); - + // System.out.println(" gt"); + if (coalescedDeletes != null) { // Lock order: IW -> BD -> RP assert readerPool.infoIsLive(info); @@ -273,118 +316,114 @@ int delCount = 0; final boolean segAllDeletes; try { - delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader); - delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader); - final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount(); + delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, + reader); + delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), + rld, reader); + final int fullDelCount = rld.info.getDelCount() + + rld.getPendingDeleteCount(); assert fullDelCount <= rld.info.info.getDocCount(); segAllDeletes = fullDelCount == rld.info.info.getDocCount(); - } finally { + } finally { rld.release(reader); readerPool.release(rld); } anyNewDeletes |= delCount > 0; - + if (segAllDeletes) { - if (allDeleted == null) { - allDeleted = new ArrayList(); - } allDeleted.add(info); } - + if (infoStream.isEnabled("BD")) { - infoStream.message("BD", "seg=" + info + " segGen=" + segGen + " coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] newDelCount=" + delCount + (segAllDeletes ? " 100% deleted" : "")); + infoStream.message("BD", "seg=" + info + " segGen=" + segGen + + " coalesced deletes=[" + + (coalescedDeletes == null ? "null" : coalescedDeletes) + + "] newDelCount=" + delCount + + (segAllDeletes ? " 100% deleted" : "")); } - if (advanced == null) { - advanced = new HashSet(); + advanced.add(info); } - advanced.add(info); - } - + infosIDX--; } } + return anyNewDeletes; + } + + private boolean handleUpdates(IndexWriter.ReaderPool readerPool, + List infos2, Set advanced) + throws IOException { + boolean anyNewDeletes = false; - // go through deletes forward and apply updates - for (SegmentInfoPerCommit updateInfo : infos2) { - final long updateSegGen = updateInfo.getBufferedDeletesGen(); + for (SegmentInfoPerCommit info : infos2) { + final long segGen = info.getBufferedDeletesGen(); - for (FrozenBufferedDeletes updatePacket : deletes) { - if (updatePacket.anyUpdates() && updateSegGen <= updatePacket.delGen()) { - assert readerPool.infoIsLive(updateInfo); + for (int delIdx = 0; delIdx < deletes.size(); delIdx++) { + FrozenBufferedDeletes packet = deletes.get(delIdx); + assert readerPool.infoIsLive(info); + if (segGen <= packet.delGen() && packet.anyUpdates()) { // we need to reopen the reader every time, to include previous - // updates when applying new ones - final ReadersAndLiveDocs rld = readerPool.get(updateInfo, true); + // changes when applying new ones + final ReadersAndLiveDocs rld = readerPool.get(info, true); final SegmentReader reader = rld.getReader(IOContext.READ); - final boolean exactGen = updateSegGen == updatePacket.delGen(); try { - anyNewDeletes |= applyTermUpdates(updatePacket.allUpdates, rld, - reader, exactGen); + final boolean exactGen = (segGen == packet.delGen()); + anyNewDeletes |= applyTermUpdates(packet.allUpdates, rld, reader, + exactGen); } finally { rld.release(reader); readerPool.release(rld); } - if (advanced == null) { - advanced = new HashSet(); - } - advanced.add(updateInfo); } } + advanced.add(info); + } - - if (advanced != null) { - for (SegmentInfoPerCommit info : advanced) { - info.setBufferedDeletesGen(gen); - } - } - - assert checkDeleteStats(); - if (infoStream.isEnabled("BD")) { - infoStream.message("BD", "applyDeletes took " + (System.currentTimeMillis()-t0) + " msec"); - } - // assert infos != segmentInfos || !any() : "infos=" + infos + " segmentInfos=" + segmentInfos + " any=" + any; - - return new ApplyDeletesResult(anyNewDeletes, gen, allDeleted); + return anyNewDeletes; } synchronized long getNextGen() { return nextGen++; } - + // Lock order IW -> BD - /* Removes any BufferedDeletes that we no longer need to - * store because all segments in the index have had the - * deletes applied. */ + /* + * Removes any BufferedDeletes that we no longer need to store because all + * segments in the index have had the deletes applied. + */ public synchronized void prune(SegmentInfos segmentInfos) { assert checkDeleteStats(); long minGen = Long.MAX_VALUE; - for(SegmentInfoPerCommit info : segmentInfos) { + for (SegmentInfoPerCommit info : segmentInfos) { minGen = Math.min(info.getBufferedDeletesGen(), minGen); } - + if (infoStream.isEnabled("BD")) { - infoStream.message("BD", "prune sis=" + segmentInfos + " minGen=" + minGen + " packetCount=" + deletes.size()); + infoStream.message("BD", "prune sis=" + segmentInfos + " minGen=" + + minGen + " packetCount=" + deletes.size()); } final int limit = deletes.size(); - for(int delIDX=0;delIDX= minGen) { prune(delIDX); assert checkDeleteStats(); return; } } - + // All deletes pruned prune(limit); assert !any(); assert checkDeleteStats(); } - + private synchronized void prune(int count) { if (count > 0) { if (infoStream.isEnabled("BD")) { - infoStream.message("BD", "pruneDeletes: prune " + count + " packets; " + (deletes.size() - count) + " packets remain"); + infoStream.message("BD", "pruneDeletes: prune " + count + " packets; " + + (deletes.size() - count) + " packets remain"); } - for(int delIDX=0;delIDX= 0; @@ -394,26 +433,28 @@ deletes.subList(0, count).clear(); } } - + // Delete by Term - private synchronized long applyTermDeletes(Iterable termsIter, ReadersAndLiveDocs rld, SegmentReader reader) throws IOException { + private synchronized long applyTermDeletes(Iterable termsIter, + ReadersAndLiveDocs rld, SegmentReader reader) throws IOException { long delCount = 0; Fields fields = reader.fields(); if (fields == null) { // This reader has no postings return 0; } - + TermsEnum termsEnum = null; - + String currentField = null; DocsEnum docs = null; - + assert checkDeleteTerm(null); - + boolean any = false; - - //System.out.println(Thread.currentThread().getName() + " del terms reader=" + reader); + + // System.out.println(Thread.currentThread().getName() + + // " del terms reader=" + reader); for (Term term : termsIter) { // Since we visit terms sorted, we gain performance // by re-using the same TermsEnum and seeking only @@ -428,30 +469,32 @@ termsEnum = null; } } - + if (termsEnum == null) { continue; } assert checkDeleteTerm(term); - + // System.out.println(" term=" + term); - + if (termsEnum.seekExact(term.bytes(), false)) { // we don't need term frequencies for this - DocsEnum docsEnum = termsEnum.docs(rld.getLiveDocs(), docs, DocsEnum.FLAG_NONE); - //System.out.println("BDS: got docsEnum=" + docsEnum); - + DocsEnum docsEnum = termsEnum.docs(rld.getLiveDocs(), docs, + DocsEnum.FLAG_NONE); + // System.out.println("BDS: got docsEnum=" + docsEnum); + if (docsEnum != null) { while (true) { final int docID = docsEnum.nextDoc(); - //System.out.println(Thread.currentThread().getName() + " del term=" + term + " doc=" + docID); + // System.out.println(Thread.currentThread().getName() + + // " del term=" + term + " doc=" + docID); if (docID == DocIdSetIterator.NO_MORE_DOCS) { break; - } + } // NOTE: there is no limit check on the docID // when deleting by Term (unlike by Query) // because on flush we apply all Term deletes to - // each segment. So all Term deleting here is + // each segment. So all Term deleting here is // against prior segments: if (!any) { rld.initWritableLiveDocs(); @@ -464,10 +507,10 @@ } } } - + return delCount; } - + private synchronized boolean applyTermUpdates( SortedSet packetUpdates, ReadersAndLiveDocs rld, SegmentReader reader, boolean exactSegment) throws IOException { @@ -478,9 +521,9 @@ } assert checkDeleteTerm(null); - + UpdatedSegmentData updatedSegmentData = new UpdatedSegmentData(reader, - packetUpdates, exactSegment); + packetUpdates, exactSegment, infoStream); if (updatedSegmentData.hasUpdates()) { rld.setLiveUpdates(updatedSegmentData); @@ -489,39 +532,42 @@ return false; } - + public static class QueryAndLimit { public final Query query; public final int limit; + public QueryAndLimit(Query query, int limit) { this.query = query; this.limit = limit; } } - + // Delete by query - private static long applyQueryDeletes(Iterable queriesIter, ReadersAndLiveDocs rld, final SegmentReader reader) throws IOException { + private static long applyQueryDeletes(Iterable queriesIter, + ReadersAndLiveDocs rld, final SegmentReader reader) throws IOException { long delCount = 0; final AtomicReaderContext readerContext = reader.getContext(); boolean any = false; for (QueryAndLimit ent : queriesIter) { Query query = ent.query; int limit = ent.limit; - final DocIdSet docs = new QueryWrapperFilter(query).getDocIdSet(readerContext, reader.getLiveDocs()); + final DocIdSet docs = new QueryWrapperFilter(query).getDocIdSet( + readerContext, reader.getLiveDocs()); if (docs != null) { final DocIdSetIterator it = docs.iterator(); if (it != null) { - while(true) { + while (true) { int doc = it.nextDoc(); if (doc >= limit) { break; } - + if (!any) { rld.initWritableLiveDocs(); any = true; } - + if (rld.delete(doc)) { delCount++; } @@ -529,30 +575,35 @@ } } } - + return delCount; } - + // used only by assert private boolean checkDeleteTerm(Term term) { if (term != null) { - assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) > 0: "lastTerm=" + lastDeleteTerm + " vs term=" + term; + assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) > 0 : "lastTerm=" + + lastDeleteTerm + " vs term=" + term; } - // TODO: we re-use term now in our merged iterable, but we shouldn't clone, instead copy for this assert - lastDeleteTerm = term == null ? null : new Term(term.field(), BytesRef.deepCopyOf(term.bytes)); + // TODO: we re-use term now in our merged iterable, but we shouldn't clone, + // instead copy for this assert + lastDeleteTerm = term == null ? null : new Term(term.field(), + BytesRef.deepCopyOf(term.bytes)); return true; } - + // only for assert private boolean checkDeleteStats() { int numTerms2 = 0; long bytesUsed2 = 0; - for(FrozenBufferedDeletes packet : deletes) { + for (FrozenBufferedDeletes packet : deletes) { numTerms2 += packet.numTermDeletes; bytesUsed2 += packet.bytesUsed; } - assert numTerms2 == numTerms.get(): "numTerms2=" + numTerms2 + " vs " + numTerms.get(); - assert bytesUsed2 == bytesUsed.get(): "bytesUsed2=" + bytesUsed2 + " vs " + bytesUsed; + assert numTerms2 == numTerms.get() : "numTerms2=" + numTerms2 + " vs " + + numTerms.get(); + assert bytesUsed2 == bytesUsed.get() : "bytesUsed2=" + bytesUsed2 + " vs " + + bytesUsed; return true; } } Index: lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java (revision 1481938) +++ lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java (working copy) @@ -17,8 +17,9 @@ * limitations under the License. */ -import java.util.SortedSet; -import java.util.TreeSet; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -37,7 +38,7 @@ class BufferedUpdates { final AtomicInteger numTermUpdates = new AtomicInteger(); - final SortedFieldsUpdates terms = new SortedFieldsUpdates(); + final ConcurrentSkipListMap> terms = new ConcurrentSkipListMap>(); public static final Integer MAX_INT = Integer.valueOf(Integer.MAX_VALUE); @@ -73,21 +74,11 @@ } } - public void addTerm(Term term, FieldsUpdate update) { - SortedSet current = terms.get(term); - //if (current != null && update.docIDUpto < current.peek().docIDUpto) { - // Only record the new number if it's greater than the - // current one. This is important because if multiple - // threads are replacing the same doc at nearly the - // same time, it's possible that one thread that got a - // higher docID is scheduled before the other - // threads. If we blindly replace than we can - // incorrectly get both docs indexed. - //return; - //} + public synchronized void addTerm(Term term, FieldsUpdate update) { + List current = terms.get(term); if (current == null) { - current = new TreeSet(); + current = new ArrayList(1); terms.put(term, current); bytesUsed.addAndGet(BufferedDeletes.BYTES_PER_DEL_TERM + term.bytes.length Index: lucene/core/src/java/org/apache/lucene/index/CoalescedDeletes.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/CoalescedDeletes.java (revision 1481938) +++ lucene/core/src/java/org/apache/lucene/index/CoalescedDeletes.java (working copy) @@ -38,10 +38,12 @@ void update(FrozenBufferedDeletes in) { iterables.add(in.termsIterable()); - - for(int queryIdx=0;queryIdx replacedFields; final Analyzer analyzer; final int docIdUpto; - final long timeStamp; + final int updateNumber; IndexDocument fields; Directory directory; @@ -64,11 +64,13 @@ * The fields to use in the update operation. * @param analyzer * The analyzer to use in the update. - * @param docIDUpto - * Document ID of the last document added before this field update + * @param docIdUpto + * The doc ID of the last document added before this update. + * @param updateNumber + * The running number of this update for the current segment. */ public FieldsUpdate(Term term, Operation operation, IndexDocument fields, - Analyzer analyzer, int docIDUpto, long timeStamp) { + Analyzer analyzer, int docIdUpto, int updateNumber) { this.term = term; this.fields = fields; this.operation = operation; @@ -84,8 +86,8 @@ } } this.analyzer = analyzer; - this.docIdUpto = docIDUpto; - this.timeStamp = timeStamp; + this.docIdUpto = docIdUpto; + this.updateNumber = updateNumber; } /** @@ -100,23 +102,20 @@ this.replacedFields = other.replacedFields; this.analyzer = other.analyzer; this.docIdUpto = other.docIdUpto; - this.timeStamp = other.timeStamp; + this.updateNumber = other.updateNumber; this.directory = other.directory; this.segmentInfo = other.segmentInfo; } - /* Order FrieldsUpdate by increasing docIDUpto */ @Override public int compareTo(FieldsUpdate other) { - int diff = this.docIdUpto - other.docIdUpto; - if (diff == 0) { - if (this.timeStamp < other.timeStamp) { - return -1; - } else if (this.timeStamp > other.timeStamp) { - return 1; - } - } - return diff; + return this.updateNumber - other.updateNumber; } - + + @Override + public String toString() { + return "FieldsUpdate [term=" + term + ", operation=" + operation + + ", docIdUpto=" + docIdUpto + ", updateNumber=" + updateNumber + "]"; + } + } Index: lucene/core/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java (revision 1481938) +++ lucene/core/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java (working copy) @@ -359,7 +359,7 @@ assert !writeOffsets || writePositions; final Map segDeletes; - if (state.segDeletes != null && state.segDeletes.terms.size() > 0) { + if (state.hasDeletesWithoutUpdates() && state.segDeletes.terms.size() > 0) { segDeletes = state.segDeletes.terms; } else { segDeletes = null; @@ -476,7 +476,7 @@ if (state.liveDocs == null) { state.liveDocs = docState.docWriter.codec.liveDocsFormat().newLiveDocs(state.segmentInfo.getDocCount()); } - if (state.liveDocs.get(docID)) { + if (state.hasDeletesWithoutUpdates() && state.liveDocs.get(docID)) { state.delCountOnFlush++; state.liveDocs.clear(docID); } Index: lucene/core/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java (revision 1481938) +++ lucene/core/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java (working copy) @@ -55,13 +55,17 @@ // a segment private deletes. in that case is should // only have Queries - // An sorted set of updates + // A sorted set of updates final SortedSet allUpdates; - public FrozenBufferedDeletes(BufferedDeletes deletes, BufferedUpdates updates, boolean isSegmentPrivate) { + public FrozenBufferedDeletes(BufferedDeletes deletes, + BufferedUpdates updates, boolean isSegmentPrivate) { this.isSegmentPrivate = isSegmentPrivate; int localBytesUsed = 0; + + // freeze deletes if (deletes != null) { + // arrange terms and queries in arrays assert !isSegmentPrivate || deletes.terms.size() == 0 : "segment private package should only have del queries"; Term termsArray[] = deletes.terms.keySet().toArray( new Term[deletes.terms.size()]); @@ -97,10 +101,10 @@ allUpdates = null; } else { allUpdates = new TreeSet<>(); - for (SortedSet list : updates.terms.values()) { + for (List list : updates.terms.values()) { allUpdates.addAll(list); } - localBytesUsed += 100; + localBytesUsed += updates.bytesUsed.get(); } bytesUsed = localBytesUsed; Index: lucene/core/src/java/org/apache/lucene/index/IndexFileNames.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/IndexFileNames.java (revision 1481938) +++ lucene/core/src/java/org/apache/lucene/index/IndexFileNames.java (working copy) @@ -242,7 +242,7 @@ * All files created by codecs much match this pattern (checked in * SegmentInfo). */ - public static final Pattern CODEC_FILE_PATTERN = Pattern.compile("_[a-z0-9]+(_.*)?\\..*"); + public static final Pattern CODEC_FILE_PATTERN = Pattern.compile("_[_]?[a-z0-9]+(_.*)?\\..*"); /** Returns true if the file denotes an updated segment. */ public static boolean isUpdatedSegmentFile(String file) { Index: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (revision 1481938) +++ lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -31,6 +31,7 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.analysis.Analyzer; @@ -227,6 +228,9 @@ // prepareCommit() & before commit()) volatile long pendingCommitChangeCount; + volatile AtomicBoolean deletesPending; // set when there are pending deletes + // to be flushed before adding updates + private Collection filesToCommit; final SegmentInfos segmentInfos; // the segments @@ -262,8 +266,6 @@ final ReaderPool readerPool = new ReaderPool(); final BufferedDeletesStream bufferedDeletesStream; - private boolean updatesPending; - // This is a "write once" variable (like the organic dye // on a DVD-R that may or may not be heated by a laser and // then cooled to permanently record the event): it's @@ -682,6 +684,7 @@ bufferedDeletesStream = new BufferedDeletesStream(infoStream); poolReaders = config.getReaderPooling(); + deletesPending = new AtomicBoolean(false); writeLock = directory.makeLock(WRITE_LOCK_NAME); @@ -1344,6 +1347,9 @@ try { anySegmentFlushed = docWriter.updateDocuments(docs, analyzer, delTerm); success = true; + if (delTerm != null) { + deletesPending.set(true); + } } finally { if (!success) { if (infoStream.isEnabled("IW")) { @@ -1435,6 +1441,12 @@ public void updateFields(FieldsUpdate.Operation operation, Term term, IndexDocument fields, Analyzer analyzer) throws IOException { ensureOpen(); + + if (deletesPending.get()) { + commit(); + deletesPending.set(false); + } + try { boolean success = false; boolean anySegmentFlushed = false; @@ -1442,7 +1454,6 @@ anySegmentFlushed = docWriter.updateFields(term, operation, fields, analyzer, globalFieldNumberMap); success = true; - updatesPending = true; } finally { if (!success) { if (infoStream.isEnabled("IW")) { @@ -1478,6 +1489,9 @@ ensureOpen(); try { docWriter.deleteTerms(term); + if (term != null) { + deletesPending.set(true); + } } catch (OutOfMemoryError oom) { handleOOM(oom, "deleteDocuments(Term)"); } @@ -1581,6 +1595,9 @@ ensureOpen(); try { docWriter.deleteTerms(terms); + if (terms != null && terms.length > 0) { + deletesPending.set(true); + } } catch (OutOfMemoryError oom) { handleOOM(oom, "deleteDocuments(Term..)"); } @@ -1605,6 +1622,9 @@ ensureOpen(); try { docWriter.deleteQueries(query); + if (query != null) { + deletesPending.set(true); + } } catch (OutOfMemoryError oom) { handleOOM(oom, "deleteDocuments(Query)"); } @@ -1630,6 +1650,9 @@ ensureOpen(); try { docWriter.deleteQueries(queries); + if (queries != null && queries.length > 0) { + deletesPending.set(true); + } } catch (OutOfMemoryError oom) { handleOOM(oom, "deleteDocuments(Query..)"); } @@ -1705,6 +1728,9 @@ try { anySegmentFlushed = docWriter.updateDocument(doc, analyzer, term); success = true; + if (term != null) { + deletesPending.set(true); + } } finally { if (!success) { if (infoStream.isEnabled("IW")) { @@ -3140,7 +3166,6 @@ deleter.decRef(filesToCommit); filesToCommit = null; pendingCommit = null; - updatesPending = false; notifyAll(); } Index: lucene/core/src/java/org/apache/lucene/index/SegmentWriteState.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/SegmentWriteState.java (revision 1481938) +++ lucene/core/src/java/org/apache/lucene/index/SegmentWriteState.java (working copy) @@ -119,4 +119,17 @@ segUpdates = state.segUpdates; delCountOnFlush = state.delCountOnFlush; } + + public boolean hasDeletesWithoutUpdates() { + if (segDeletes == null) { + return false; + } + if (segUpdates == null) { + return true; + } + if (segUpdates.any()) { + return false; + } + return true; + } } Index: lucene/core/src/java/org/apache/lucene/index/SortedFieldsUpdates.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/SortedFieldsUpdates.java (revision 1481938) +++ lucene/core/src/java/org/apache/lucene/index/SortedFieldsUpdates.java (working copy) @@ -1,25 +0,0 @@ -package org.apache.lucene.index; - -import java.util.SortedSet; -import java.util.TreeMap; - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -public class SortedFieldsUpdates extends TreeMap> { - -} Index: lucene/core/src/java/org/apache/lucene/index/UpdatedSegmentData.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/UpdatedSegmentData.java (revision 1481938) +++ lucene/core/src/java/org/apache/lucene/index/UpdatedSegmentData.java (working copy) @@ -19,6 +19,8 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Bits; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.InfoStream; /* * Licensed to the Apache Software Foundation (ASF) under one or more @@ -45,12 +47,13 @@ static final FieldInfos EMPTY_FIELD_INFOS = new FieldInfos(new FieldInfo[0]); /** Updates mapped by doc ID, for each do sorted list of updates. */ - private TreeMap>> docIdToUpdatesMap; - private HashMap> updatesToDocIdMap; - private LinkedHashMap allApplied; + private final TreeMap>> docIdToUpdatesMap; + private final HashMap> updatesToDocIdMap; + private final LinkedHashMap allApplied; + private final boolean exactSegment; + private final InfoStream infoStream; private long generation; - private boolean exactSegment; private Map fieldGenerationReplacments; @@ -62,15 +65,18 @@ private Analyzer analyzer; UpdatedSegmentData(SegmentReader reader, - SortedSet packetUpdates, boolean exactSegment) - throws IOException { + SortedSet packetUpdates, boolean exactSegment, + InfoStream infoStream) throws IOException { docIdToUpdatesMap = new TreeMap<>(); updatesToDocIdMap = new HashMap<>(); + allApplied = new LinkedHashMap<>(); this.exactSegment = exactSegment; + this.infoStream = infoStream; - allApplied = new LinkedHashMap<>(); - for (FieldsUpdate update : packetUpdates) { + if (infoStream.isEnabled("USD")) { + infoStream.message("USD", "update: " + update); + } // add updates according to the base reader DocsEnum docsEnum = reader.termDocsEnum(update.term); if (docsEnum != null) { @@ -101,34 +107,51 @@ allApplied.put(update, new UpdateAtomicReader(update.directory, update.segmentInfo, IOContext.DEFAULT)); } - + if (infoStream.isEnabled("USD")) { + infoStream.message("USD", "done init"); + } } private void addUpdate(int docId, FieldsUpdate fieldsUpdate) { if (exactSegment && docId > fieldsUpdate.docIdUpto) { return; } - TreeMap> prevUpdates = docIdToUpdatesMap.get(docId); - if (prevUpdates == null) { - prevUpdates = new TreeMap<>(); - docIdToUpdatesMap.put(docId, prevUpdates); - } else if (fieldsUpdate.operation == Operation.REPLACE_FIELDS) { - // set ignored fields in previous updates - for (Entry> addIgnore : prevUpdates.entrySet()) { - if (addIgnore.getValue() == null) { - prevUpdates.put(addIgnore.getKey(), new HashSet<>(fieldsUpdate.replacedFields)); - } else { - addIgnore.getValue().addAll(fieldsUpdate.replacedFields); + synchronized (docIdToUpdatesMap) { + TreeMap> prevUpdates = docIdToUpdatesMap + .get(docId); + if (prevUpdates == null) { + prevUpdates = new TreeMap<>(); + docIdToUpdatesMap.put(docId, prevUpdates); + if (infoStream.isEnabled("USD")) { + infoStream.message("USD", "adding to doc " + docId); } + } else if (fieldsUpdate.operation == Operation.REPLACE_FIELDS) { + // set ignored fields in previous updates + for (Entry> prev : prevUpdates.entrySet()) { + if (prev.getValue() == null) { + prevUpdates.put(prev.getKey(), new HashSet<>( + fieldsUpdate.replacedFields)); + if (infoStream.isEnabled("USD")) { + infoStream.message("USD", "new ignored fields " + + fieldsUpdate.replacedFields); + } + } else { + prev.getValue().addAll(fieldsUpdate.replacedFields); + if (infoStream.isEnabled("USD")) { + infoStream.message("USD", "adding ignored fields " + + fieldsUpdate.replacedFields); + } + } + } } + prevUpdates.put(fieldsUpdate, null); + List prevDocIds = updatesToDocIdMap.get(fieldsUpdate); + if (prevDocIds == null) { + prevDocIds = new ArrayList(); + updatesToDocIdMap.put(fieldsUpdate, prevDocIds); + } + prevDocIds.add(docId); } - prevUpdates.put(fieldsUpdate, null); - List prevDocIds = updatesToDocIdMap.get(fieldsUpdate); - if (prevDocIds == null) { - prevDocIds = new ArrayList(); - updatesToDocIdMap.put(fieldsUpdate, prevDocIds); - } - prevDocIds.add(docId); } boolean hasUpdates() { @@ -158,7 +181,8 @@ */ private void nextDocUpdate() { if (updatesIterator.hasNext()) { - Entry>> docUpdates = updatesIterator.next(); + Entry>> docUpdates = updatesIterator + .next(); nextDocID = docUpdates.getKey(); nextUpdate = docUpdates.getValue(); } else { @@ -177,42 +201,50 @@ AtomicReader nextReader() throws IOException { AtomicReader toReturn = null; - if (currDocID < nextDocID) { - // empty documents reader required - toReturn = new UpdateAtomicReader(nextDocID - currDocID); - currDocID = nextDocID; - } else if (currDocID < numDocs) { - // get the an actual updates reader... - FieldsUpdate update = nextUpdate.firstEntry().getKey(); - Set ignore = nextUpdate.remove(update); - toReturn = allApplied.get(update); - - // ... and if done for this document remove from updates map - if (nextUpdate.isEmpty()) { - updatesIterator.remove(); - } - - // add generation replacements if exist - if (update.replacedFields != null) { - if (fieldGenerationReplacments == null) { - fieldGenerationReplacments = new HashMap(); + boolean success = false; + try { + if (currDocID < nextDocID) { + // empty documents reader required + toReturn = new UpdateAtomicReader(nextDocID - currDocID); + currDocID = nextDocID; + } else if (currDocID < numDocs) { + // get the an actual updates reader... + FieldsUpdate update = nextUpdate.firstEntry().getKey(); + nextUpdate.remove(update); + toReturn = allApplied.get(update); + + // ... and if done for this document remove from updates map + if (nextUpdate.isEmpty()) { + updatesIterator.remove(); } - for (String fieldName : update.replacedFields) { - FieldGenerationReplacements fieldReplacement = fieldGenerationReplacments - .get(fieldName); - if (fieldReplacement == null) { - fieldReplacement = new FieldGenerationReplacements(); - fieldGenerationReplacments.put(fieldName, fieldReplacement); + + // add generation replacements if exist + if (update.replacedFields != null) { + if (fieldGenerationReplacments == null) { + fieldGenerationReplacments = new HashMap(); } - fieldReplacement.set(currDocID, generation); + for (String fieldName : update.replacedFields) { + FieldGenerationReplacements fieldReplacement = fieldGenerationReplacments + .get(fieldName); + if (fieldReplacement == null) { + fieldReplacement = new FieldGenerationReplacements(); + fieldGenerationReplacments.put(fieldName, fieldReplacement); + } + fieldReplacement.set(currDocID, generation); + } } + // move to next doc id + nextDocUpdate(); + currDocID++; } - // move to next doc id - nextDocUpdate(); - currDocID++; + success = true; + return toReturn; + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(toReturn); + } } - return toReturn; } boolean isEmpty() { @@ -238,7 +270,7 @@ */ UpdateAtomicReader(Directory fieldsDir, SegmentInfo segmentInfo, IOContext context) throws IOException { - core = new SegmentCoreReaders(null, segmentInfo, -1, context, -1); + core = new SegmentCoreReaders(null, segmentInfo, -1, context, 1); numDocs = 1; } @@ -254,13 +286,13 @@ if (core == null) { return false; } - DocsEnum termDocsEnum = termDocsEnum(term); - if (termDocsEnum == null) { + Terms terms = terms(term.field); + if (terms == null) { return false; } - return termDocsEnum.nextDoc() != DocIdSetIterator.NO_MORE_DOCS; + return terms.iterator(null).seekExact(term.bytes(), false); } - + @Override public Fields fields() throws IOException { if (core == null) { Index: lucene/core/src/test/org/apache/lucene/index/TestFieldReplacements.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestFieldReplacements.java (revision 1481938) +++ lucene/core/src/test/org/apache/lucene/index/TestFieldReplacements.java (working copy) @@ -773,8 +773,9 @@ } public void testReplaceLayers() throws IOException { - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig( - TEST_VERSION_CURRENT, new MockAnalyzer(random()))); + IndexWriterConfig indexWriterConfig = newIndexWriterConfig( + TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, indexWriterConfig); FieldType fieldType = new FieldType(); fieldType.setIndexed(true); @@ -784,6 +785,7 @@ Document doc0 = new Document(); doc0.add(new StoredField("f1", "a", fieldType)); + doc0.add(new StoredField("f2", "a", fieldType)); writer.addDocument(doc0); // add f2:b @@ -791,7 +793,7 @@ fields1.add(new StoredField("f2", "b", fieldType)); writer.updateFields(Operation.ADD_FIELDS, new Term("f1", "a"), fields1); - // remove f2:b and add f2:c + // remove f2:a and f2:b, add f2:c Document fields2 = new Document(); fields2.add(new StoredField("f2", "c", fieldType)); writer.updateFields(Operation.REPLACE_FIELDS, new Term("f2", "b"), fields2); @@ -801,12 +803,17 @@ fields3.add(new StoredField("f2", "d", fieldType)); writer.updateFields(Operation.ADD_FIELDS, new Term("f2", "b"), fields3); + // do nothing since f2:a was removed + writer.deleteDocuments(new Term("f2", "a")); + writer.close(); DirectoryReader directoryReader = DirectoryReader.open(dir); final AtomicReader atomicReader = directoryReader.leaves().get(0).reader(); printField(atomicReader, "f1"); + assertEquals("wrong number of documents", 1, directoryReader.numDocs()); + // check indexed fields final DocsAndPositionsEnum termPositionsA = atomicReader .termPositionsEnum(new Term("f1", "a")); @@ -816,6 +823,12 @@ assertEquals("wrong doc id", DocIdSetIterator.NO_MORE_DOCS, termPositionsA.nextDoc()); + final DocsAndPositionsEnum termPositionsA2 = atomicReader + .termPositionsEnum(new Term("f2", "a")); + assertNotNull("no positions for term", termPositionsA2); + assertEquals("wrong doc id", DocIdSetIterator.NO_MORE_DOCS, + termPositionsA2.nextDoc()); + final DocsAndPositionsEnum termPositionsB = atomicReader .termPositionsEnum(new Term("f2", "b")); assertNotNull("no positions for term", termPositionsB); @@ -826,6 +839,7 @@ .termPositionsEnum(new Term("f2", "c")); assertNotNull("no positions for term", termPositionsC); assertEquals("wrong doc id", 0, termPositionsC.nextDoc()); + // 100000 == 2 * StackedDocsEnum.STACKED_SEGMENT_POSITION_INCREMENT assertEquals("wrong position", 100000, termPositionsC.nextPosition()); assertEquals("wrong doc id", DocIdSetIterator.NO_MORE_DOCS, termPositionsC.nextDoc()); @@ -872,7 +886,7 @@ } public void printIndexes() throws IOException { - File outDir = new File("D:/temp/ifu/compare/scenario/b"); + File outDir = new File("D:/temp/ifu/compare/scenario/a"); outDir.mkdirs(); for (int i = 0; i < 42; i++) {