Index: lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesReader.java =================================================================== --- lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesReader.java (revision 1519637) +++ lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesReader.java (working copy) @@ -62,7 +62,7 @@ boolean fixedLength; long minValue; long numValues; - }; + } final int maxDoc; final IndexInput data; @@ -70,7 +70,7 @@ final Map fields = new HashMap(); public SimpleTextDocValuesReader(SegmentReadState state, String ext) throws IOException { - //System.out.println("dir=" + state.directory + " seg=" + state.segmentInfo.name + " ext=" + ext); + // System.out.println("dir=" + state.directory + " seg=" + state.segmentInfo.name + " file=" + IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, ext)); data = state.directory.openInput(IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, ext), state.context); maxDoc = state.segmentInfo.getDocCount(); while(true) { @@ -82,8 +82,6 @@ assert startsWith(FIELD) : scratch.utf8ToString(); String fieldName = stripPrefix(FIELD); //System.out.println(" field=" + fieldName); - FieldInfo fieldInfo = state.fieldInfos.fieldInfo(fieldName); - assert fieldInfo != null; OneField field = new OneField(); fields.put(fieldName, field); Index: lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesWriter.java =================================================================== --- lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesWriter.java (revision 1519637) +++ lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesWriter.java (working copy) @@ -55,7 +55,7 @@ private final Set fieldsSeen = new HashSet(); // for asserting public SimpleTextDocValuesWriter(SegmentWriteState state, String ext) throws IOException { - //System.out.println("WRITE: " + IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, ext) + " " + state.segmentInfo.getDocCount() + " docs"); + // System.out.println("WRITE: " + IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, ext) + " " + state.segmentInfo.getDocCount() + " docs"); data = state.directory.createOutput(IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, ext), state.context); numDocs = state.segmentInfo.getDocCount(); } Index: lucene/core/src/java/org/apache/lucene/codecs/perfield/PerFieldDocValuesFormat.java =================================================================== --- lucene/core/src/java/org/apache/lucene/codecs/perfield/PerFieldDocValuesFormat.java (revision 1519637) +++ lucene/core/src/java/org/apache/lucene/codecs/perfield/PerFieldDocValuesFormat.java (working copy) @@ -22,13 +22,13 @@ import java.util.HashMap; import java.util.IdentityHashMap; import java.util.Map; -import java.util.ServiceLoader; // javadocs +import java.util.ServiceLoader; import java.util.TreeMap; -import org.apache.lucene.codecs.PostingsFormat; import org.apache.lucene.codecs.DocValuesConsumer; +import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.DocValuesProducer; -import org.apache.lucene.codecs.DocValuesFormat; +import org.apache.lucene.codecs.PostingsFormat; import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.NumericDocValues; @@ -75,11 +75,10 @@ } @Override - public final DocValuesConsumer fieldsConsumer(SegmentWriteState state) - throws IOException { + public final DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOException { return new FieldsWriter(state); } - + static class ConsumerAndSuffix implements Closeable { DocValuesConsumer consumer; int suffix; @@ -96,7 +95,7 @@ private final Map suffixes = new HashMap(); private final SegmentWriteState segmentWriteState; - + public FieldsWriter(SegmentWriteState state) { segmentWriteState = state; } @@ -122,7 +121,14 @@ } private DocValuesConsumer getInstance(FieldInfo field) throws IOException { - final DocValuesFormat format = getDocValuesFormatForField(field.name); + final DocValuesFormat format; + if (segmentWriteState.isFieldUpdate) { + final String formatName = field.getAttribute(PER_FIELD_FORMAT_KEY); + assert formatName != null : "invalid null FORMAT_KEY for field=\"" + field.name + "\" (field updates)"; + format = DocValuesFormat.forName(formatName); + } else { + format = getDocValuesFormatForField(field.name); + } if (format == null) { throw new IllegalStateException("invalid null DocValuesFormat for field=\"" + field.name + "\""); } @@ -129,7 +135,7 @@ final String formatName = format.getName(); String previousValue = field.putAttribute(PER_FIELD_FORMAT_KEY, formatName); - assert previousValue == null: "formatName=" + formatName + " prevValue=" + previousValue; + assert segmentWriteState.isFieldUpdate || previousValue == null: "formatName=" + formatName + " prevValue=" + previousValue; Integer suffix; @@ -136,18 +142,23 @@ ConsumerAndSuffix consumer = formats.get(format); if (consumer == null) { // First time we are seeing this format; create a new instance - - // bump the suffix - suffix = suffixes.get(formatName); - if (suffix == null) { - suffix = 0; + + if (segmentWriteState.isFieldUpdate) { + final String suffixAtt = field.getAttribute(PER_FIELD_SUFFIX_KEY); + assert suffixAtt != null : "invalid numm SUFFIX_KEY for field=\"" + field.name + "\" (field updates)"; + suffix = Integer.valueOf(suffixAtt); } else { - suffix = suffix + 1; + // bump the suffix + suffix = suffixes.get(formatName); + if (suffix == null) { + suffix = 0; + } else { + suffix = suffix + 1; + } } suffixes.put(formatName, suffix); - final String segmentSuffix = getFullSegmentSuffix(field.name, - segmentWriteState.segmentSuffix, + final String segmentSuffix = getFullSegmentSuffix(segmentWriteState.segmentSuffix, getSuffix(formatName, Integer.toString(suffix))); consumer = new ConsumerAndSuffix(); consumer.consumer = format.fieldsConsumer(new SegmentWriteState(segmentWriteState, segmentSuffix)); @@ -160,10 +171,10 @@ } previousValue = field.putAttribute(PER_FIELD_SUFFIX_KEY, Integer.toString(suffix)); - assert previousValue == null; + assert segmentWriteState.isFieldUpdate || previousValue == null : "suffix=" + Integer.toString(suffix) + " prevValue=" + previousValue; // TODO: we should only provide the "slice" of FIS - // that this PF actually sees ... + // that this DVF actually sees ... return consumer.consumer; } @@ -178,14 +189,11 @@ return formatName + "_" + suffix; } - static String getFullSegmentSuffix(String fieldName, String outerSegmentSuffix, String segmentSuffix) { + static String getFullSegmentSuffix(String outerSegmentSuffix, String segmentSuffix) { if (outerSegmentSuffix.length() == 0) { return segmentSuffix; } else { - // TODO: support embedding; I think it should work but - // we need a test confirm to confirm - // return outerSegmentSuffix + "_" + segmentSuffix; - throw new IllegalStateException("cannot embed PerFieldPostingsFormat inside itself (field \"" + fieldName + "\" returned PerFieldPostingsFormat)"); + return outerSegmentSuffix + "_" + segmentSuffix; } } @@ -209,7 +217,7 @@ final String suffix = fi.getAttribute(PER_FIELD_SUFFIX_KEY); assert suffix != null; DocValuesFormat format = DocValuesFormat.forName(formatName); - String segmentSuffix = getSuffix(formatName, suffix); + String segmentSuffix = getFullSegmentSuffix(readState.segmentSuffix, getSuffix(formatName, suffix)); if (!formats.containsKey(segmentSuffix)) { formats.put(segmentSuffix, format.fieldsProducer(new SegmentReadState(readState, segmentSuffix))); } Index: lucene/core/src/java/org/apache/lucene/index/BufferedDeletes.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/BufferedDeletes.java (revision 1519637) +++ lucene/core/src/java/org/apache/lucene/index/BufferedDeletes.java (working copy) @@ -19,10 +19,11 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.search.Query; import org.apache.lucene.util.RamUsageEstimator; @@ -63,10 +64,20 @@ undercount (say 24 bytes). Integer is OBJ_HEADER + INT. */ final static int BYTES_PER_DEL_QUERY = 5*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 2*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + 2*RamUsageEstimator.NUM_BYTES_INT + 24; + // nocommit: review! + final static int BYTES_PER_NUMERIC_UPDATE = BYTES_PER_DEL_TERM + 2*RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_INT + RamUsageEstimator.NUM_BYTES_LONG; + final AtomicInteger numTermDeletes = new AtomicInteger(); + final AtomicInteger numNumericUpdates = new AtomicInteger(); final Map terms = new HashMap(); final Map queries = new HashMap(); final List docIDs = new ArrayList(); + // Map> + // LinkedHashMap because we need to preserve the order of the updates. That + // is, if two terms update the same document and same DV field, whoever came + // in last should win. LHM guarantees we iterate on the map in insertion + // order. + final Map> numericUpdates = new LinkedHashMap>(); public static final Integer MAX_INT = Integer.valueOf(Integer.MAX_VALUE); @@ -88,8 +99,8 @@ public String toString() { if (VERBOSE_DELETES) { return "gen=" + gen + " numTerms=" + numTermDeletes + ", terms=" + terms - + ", queries=" + queries + ", docIDs=" + docIDs + ", bytesUsed=" - + bytesUsed; + + ", queries=" + queries + ", docIDs=" + docIDs + ", numericUpdates=" + numericUpdates + + ", bytesUsed=" + bytesUsed; } else { String s = "gen=" + gen; if (numTermDeletes.get() != 0) { @@ -101,6 +112,9 @@ if (docIDs.size() != 0) { s += " " + docIDs.size() + " deleted docIDs"; } + if (numNumericUpdates.get() != 0) { + s += " " + numNumericUpdates.get() + " numeric updates (unique count=" + numericUpdates.size() + ")"; + } if (bytesUsed.get() != 0) { s += " bytesUsed=" + bytesUsed.get(); } @@ -145,11 +159,37 @@ } } + public void addNumericUpdate(NumericUpdate update, int docIDUpto) { + Map termUpdates = numericUpdates.get(update.term); + if (termUpdates == null) { + termUpdates = new HashMap(); + numericUpdates.put(update.term, termUpdates); + } + final NumericUpdate current = termUpdates.get(update.field); + if (current != null && docIDUpto < current.docIDUpto) { + // Only record the new number if it's greater than or equal to the current + // one. This is important because if multiple threads are replacing the + // same doc at nearly the same time, it's possible that one thread that + // got a higher docID is scheduled before the other threads. + return; + } + + update.docIDUpto = docIDUpto; + termUpdates.put(update.field, update); + numNumericUpdates.incrementAndGet(); + if (current == null) { + // nocommit: review! + bytesUsed.addAndGet(BYTES_PER_NUMERIC_UPDATE + update.term.bytes.length + (RamUsageEstimator.NUM_BYTES_CHAR * update.term.field().length())); + } + } + void clear() { terms.clear(); queries.clear(); docIDs.clear(); + numericUpdates.clear(); numTermDeletes.set(0); + numNumericUpdates.set(0); bytesUsed.set(0); } @@ -159,6 +199,6 @@ } boolean any() { - return terms.size() > 0 || docIDs.size() > 0 || queries.size() > 0; + return terms.size() > 0 || docIDs.size() > 0 || queries.size() > 0 || numericUpdates.size() > 0; } } Index: lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java (revision 1519637) +++ lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java (working copy) @@ -18,10 +18,11 @@ */ import java.io.IOException; -import java.util.List; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; -import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -114,6 +115,7 @@ } public static class ApplyDeletesResult { + // True if any actual deletes took place: public final boolean anyDeletes; @@ -123,10 +125,14 @@ // If non-null, contains segments that are 100% deleted public final List allDeleted; - ApplyDeletesResult(boolean anyDeletes, long gen, List allDeleted) { + // True if any actual numeric docvalues updates took place + public final boolean anyNumericDVUpdates; + + ApplyDeletesResult(boolean anyDeletes, long gen, List allDeleted, boolean anyNumericDVUpdates) { this.anyDeletes = anyDeletes; this.gen = gen; this.allDeleted = allDeleted; + this.anyNumericDVUpdates = anyNumericDVUpdates; } } @@ -145,7 +151,7 @@ final long t0 = System.currentTimeMillis(); if (infos.size() == 0) { - return new ApplyDeletesResult(false, nextGen++, null); + return new ApplyDeletesResult(false, nextGen++, null, false); } assert checkDeleteStats(); @@ -154,7 +160,7 @@ if (infoStream.isEnabled("BD")) { infoStream.message("BD", "applyDeletes: no deletes; skipping"); } - return new ApplyDeletesResult(false, nextGen++, null); + return new ApplyDeletesResult(false, nextGen++, null, false); } if (infoStream.isEnabled("BD")) { @@ -169,6 +175,7 @@ CoalescedDeletes coalescedDeletes = null; boolean anyNewDeletes = false; + boolean anyNewUpdates = false; int infosIDX = infos2.size()-1; int delIDX = deletes.size()-1; @@ -206,7 +213,7 @@ // Lock order: IW -> BD -> RP assert readerPool.infoIsLive(info); final ReadersAndLiveDocs rld = readerPool.get(info, true); - final SegmentReader reader = rld.getReader(IOContext.READ); + final SegmentReader reader = rld.getReader(false, IOContext.READ); // don't apply deletes, as we're about to add more! int delCount = 0; final boolean segAllDeletes; try { @@ -214,11 +221,13 @@ //System.out.println(" del coalesced"); delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader); delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader); + anyNewUpdates |= applyNumericDocValueUpdates(coalescedDeletes.numericDVUpdates, rld, reader); } //System.out.println(" del exact"); // Don't delete by Term here; DocumentsWriterPerThread // already did that on flush: delCount += applyQueryDeletes(packet.queriesIterable(), rld, reader); + anyNewUpdates |= applyNumericDocValueUpdates(Arrays.asList(packet.updates), rld, reader); final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount(); assert fullDelCount <= rld.info.info.getDocCount(); segAllDeletes = fullDelCount == rld.info.info.getDocCount(); @@ -259,12 +268,13 @@ // Lock order: IW -> BD -> RP assert readerPool.infoIsLive(info); final ReadersAndLiveDocs rld = readerPool.get(info, true); - final SegmentReader reader = rld.getReader(IOContext.READ); + final SegmentReader reader = rld.getReader(false, IOContext.READ); // don't apply deletes, as we're about to add more! int delCount = 0; final boolean segAllDeletes; try { delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader); delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader); + anyNewUpdates |= applyNumericDocValueUpdates(coalescedDeletes.numericDVUpdates, rld, reader); final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount(); assert fullDelCount <= rld.info.info.getDocCount(); segAllDeletes = fullDelCount == rld.info.info.getDocCount(); @@ -297,7 +307,7 @@ } // assert infos != segmentInfos || !any() : "infos=" + infos + " segmentInfos=" + segmentInfos + " any=" + any; - return new ApplyDeletesResult(anyNewDeletes, gen, allDeleted); + return new ApplyDeletesResult(anyNewDeletes, gen, allDeleted, anyNewUpdates); } synchronized long getNextGen() { @@ -402,15 +412,15 @@ if (docID == DocIdSetIterator.NO_MORE_DOCS) { break; } + if (!any) { + rld.initWritableLiveDocs(); + any = true; + } // NOTE: there is no limit check on the docID // when deleting by Term (unlike by Query) // because on flush we apply all Term deletes to // each segment. So all Term deleting here is // against prior segments: - if (!any) { - rld.initWritableLiveDocs(); - any = true; - } if (rld.delete(docID)) { delCount++; } @@ -422,6 +432,65 @@ return delCount; } + // NumericDocValue Updates + private synchronized boolean applyNumericDocValueUpdates(Iterable updates, ReadersAndLiveDocs rld, SegmentReader reader) throws IOException { + Fields fields = reader.fields(); + if (fields == null) { + // This reader has no postings + return false; + } + + TermsEnum termsEnum = null; + DocsEnum docs = null; + boolean any = false; + //System.out.println(Thread.currentThread().getName() + " numericDVUpdate reader=" + reader); + for (NumericUpdate update : updates) { + Term term = update.term; + int limit = update.docIDUpto; + + // TODO: we rely on the map being ordered by updates order, not by terms order. + // we need that so that if two terms update the same document, the one that came + // last wins. + // alternatively, we could keep a map from doc->lastUpto and apply the update + // in terms order, where an update is applied only if its docIDUpto is greater + // than lastUpto. + // but, since app can send two updates, in order, which will have same upto, we + // cannot rely solely on docIDUpto, and need to have our own gen, which is + // incremented with every update. + + // Unlike applyTermDeletes, we visit terms in update order, not term order. + // Therefore we cannot assume we can only seek forwards and must ask for a + // new TermsEnum + Terms terms = fields.terms(term.field); + if (terms == null) { // no terms in that field + termsEnum = null; + continue; + } + + termsEnum = terms.iterator(termsEnum); + + // System.out.println(" term=" + term); + + if (termsEnum.seekExact(term.bytes())) { + // we don't need term frequencies for this + DocsEnum docsEnum = termsEnum.docs(rld.getLiveDocs(), docs, DocsEnum.FLAG_NONE); + + //System.out.println("BDS: got docsEnum=" + docsEnum); + + int doc; + while ((doc = docsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + //System.out.println(Thread.currentThread().getName() + " numericDVUpdate term=" + term + " doc=" + docID); + if (doc >= limit) { + break; // no more docs that can be updated for this term + } + rld.updateNumericDocValue(update.field, doc, update.value); + any = true; + } + } + } + return any; + } + public static class QueryAndLimit { public final Query query; public final int limit; @@ -476,7 +545,7 @@ } // only for assert - private boolean checkDeleteStats() { + private boolean checkDeleteStats() { // nocommit add updates too? int numTerms2 = 0; long bytesUsed2 = 0; for(FrozenBufferedDeletes packet : deletes) { Index: lucene/core/src/java/org/apache/lucene/index/CoalescedDeletes.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/CoalescedDeletes.java (revision 1519637) +++ lucene/core/src/java/org/apache/lucene/index/CoalescedDeletes.java (working copy) @@ -29,20 +29,27 @@ class CoalescedDeletes { final Map queries = new HashMap(); final List> iterables = new ArrayList>(); - + final List numericDVUpdates = new ArrayList(); + @Override public String toString() { // note: we could add/collect more debugging information - return "CoalescedDeletes(termSets=" + iterables.size() + ",queries=" + queries.size() + ")"; + return "CoalescedDeletes(termSets=" + iterables.size() + ",queries=" + queries.size() + ",numericUpdates=" + numericDVUpdates.size() + ")"; } void update(FrozenBufferedDeletes in) { iterables.add(in.termsIterable()); - for(int queryIdx=0;queryIdx termsIterable() { Index: lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java (revision 1519637) +++ lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java (working copy) @@ -158,6 +158,15 @@ return applyAllDeletes( deleteQueue); } + synchronized void updateNumericDocValue(Term term, String field, Long value) throws IOException { + final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; + deleteQueue.addNumericUpdate(new NumericUpdate(term, field, value)); + flushControl.doOnDelete(); + if (flushControl.doApplyAllDeletes()) { + applyAllDeletes(deleteQueue); + } + } + DocumentsWriterDeleteQueue currentDeleteSession() { return deleteQueue; } Index: lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java (revision 1519637) +++ lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java (working copy) @@ -107,6 +107,11 @@ tryApplyGlobalSlice(); } + void addNumericUpdate(NumericUpdate update) { + add(new NumericUpdateNode(update)); + tryApplyGlobalSlice(); + } + /** * invariant for document update */ @@ -380,7 +385,23 @@ } } + private static final class NumericUpdateNode extends Node { + NumericUpdateNode(NumericUpdate update) { + super(update); + } + + @Override + void apply(BufferedDeletes bufferedDeletes, int docIDUpto) { + bufferedDeletes.addNumericUpdate(item, docIDUpto); + } + + @Override + public String toString() { + return "update=" + item; + } + } + private boolean forceApplyGlobalSlice() { globalBufferLock.lock(); final Node currentTail = tail; Index: lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (revision 1519637) +++ lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (working copy) @@ -17,9 +17,11 @@ * limitations under the License. */ +import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK; +import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE; + import java.io.IOException; import java.text.NumberFormat; -import java.util.Collection; import java.util.HashSet; import java.util.Locale; import java.util.Set; @@ -41,9 +43,6 @@ import org.apache.lucene.util.MutableBits; import org.apache.lucene.util.RamUsageEstimator; -import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK; -import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE; - class DocumentsWriterPerThread { /** @@ -174,8 +173,9 @@ final DocConsumer consumer; final Counter bytesUsed; - //Deletes for our still-in-RAM (to be flushed next) segment - final BufferedDeletes pendingDeletes; + SegmentWriteState flushState; + // Deletes for our still-in-RAM (to be flushed next) segment + final BufferedDeletes pendingDeletes; private final SegmentInfo segmentInfo; // Current segment we are working on boolean aborting = false; // True if an abort is pending boolean hasAborted = false; // True if the last exception throws by #updateDocument was aborting @@ -467,7 +467,7 @@ pendingDeletes.terms.clear(); segmentInfo.setFiles(new HashSet(directory.getCreatedFiles())); - final SegmentInfoPerCommit segmentInfoPerCommit = new SegmentInfoPerCommit(segmentInfo, 0, -1L); + final SegmentInfoPerCommit segmentInfoPerCommit = new SegmentInfoPerCommit(segmentInfo, 0, -1L, -1L); if (infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", "new segment has " + (flushState.liveDocs == null ? 0 : (flushState.segmentInfo.getDocCount() - flushState.delCountOnFlush)) + " deleted docs"); infoStream.message("DWPT", "new segment has " + @@ -481,7 +481,8 @@ } final BufferedDeletes segmentDeletes; - if (pendingDeletes.queries.isEmpty()) { + if (pendingDeletes.queries.isEmpty() && pendingDeletes.numericUpdates.isEmpty()) { + pendingDeletes.clear(); segmentDeletes = null; } else { segmentDeletes = pendingDeletes; Index: lucene/core/src/java/org/apache/lucene/index/FieldInfo.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/FieldInfo.java (revision 1519637) +++ lucene/core/src/java/org/apache/lucene/index/FieldInfo.java (working copy) @@ -79,7 +79,7 @@ * Character offsets are encoded alongside the positions. */ DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS, - }; + } /** * DocValues types. @@ -110,7 +110,7 @@ * ordinal and by-value. Values must be <= 32766 bytes. */ SORTED_SET - }; + } /** * Sole Constructor. Index: lucene/core/src/java/org/apache/lucene/index/FieldInfos.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/FieldInfos.java (revision 1519637) +++ lucene/core/src/java/org/apache/lucene/index/FieldInfos.java (working copy) @@ -223,6 +223,20 @@ (dvType == null || docValuesType.get(name) == null || dvType == docValuesType.get(name)); } + /** + * Returns true if the {@code fieldName} exists in the map and is of the + * same {@code dvType}. + */ + synchronized boolean contains(String fieldName, DocValuesType dvType) { + // used by IndexWriter.updateNumericDocValue + if (!nameToNumber.containsKey(fieldName)) { + return false; + } else { + // only return true if the field has the same dvType as the requested one + return dvType == docValuesType.get(fieldName); + } + } + synchronized void clear() { numberToName.clear(); nameToNumber.clear(); Index: lucene/core/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java (revision 1519637) +++ lucene/core/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java (working copy) @@ -17,13 +17,15 @@ * limitations under the License. */ +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Map; +import org.apache.lucene.index.BufferedDeletesStream.QueryAndLimit; import org.apache.lucene.search.Query; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.RamUsageEstimator; -import org.apache.lucene.index.BufferedDeletesStream.QueryAndLimit; /** Holds buffered deletes by term or query, once pushed. * Pushed deletes are write-once, so we shift to more @@ -34,15 +36,21 @@ /* Query we often undercount (say 24 bytes), plus int. */ final static int BYTES_PER_DEL_QUERY = RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_INT + 24; + + // nocommit review! + final static int BYTES_PER_NUMERIC_UPDATE = RamUsageEstimator.NUM_BYTES_OBJECT_REF + BufferedDeletes.BYTES_PER_NUMERIC_UPDATE; // Terms, in sorted order: final PrefixCodedTerms terms; int termCount; // just for debugging - // Parallel array of deleted query, and the docIDUpto for - // each + // Parallel array of deleted query, and the docIDUpto for each final Query[] queries; final int[] queryLimits; + + // numeric DV update term and their updates + final NumericUpdate[] updates; + final int bytesUsed; final int numTermDeletes; private long gen = -1; // assigned by BufferedDeletesStream once pushed @@ -72,7 +80,15 @@ upto++; } - bytesUsed = (int) terms.getSizeInBytes() + queries.length * BYTES_PER_DEL_QUERY; + List allUpdates = new ArrayList(); + for (Map fieldUpdates : deletes.numericUpdates.values()) { + for (NumericUpdate update : fieldUpdates.values()) { + allUpdates.add(update); + } + } + updates = allUpdates.toArray(new NumericUpdate[allUpdates.size()]); + + bytesUsed = (int) terms.getSizeInBytes() + queries.length * BYTES_PER_DEL_QUERY + updates.length * BYTES_PER_NUMERIC_UPDATE; numTermDeletes = deletes.numTermDeletes.get(); } @@ -140,6 +156,6 @@ } boolean any() { - return termCount > 0 || queries.length > 0; + return termCount > 0 || queries.length > 0 || updates.length > 0; } } Index: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (revision 1519637) +++ lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -36,6 +36,7 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; +import org.apache.lucene.index.FieldInfo.DocValuesType; import org.apache.lucene.index.FieldInfos.FieldNumbers; import org.apache.lucene.index.IndexWriterConfig.OpenMode; import org.apache.lucene.index.MergePolicy.MergeTrigger; @@ -532,8 +533,11 @@ // Make sure we only write del docs for a live segment: assert infoIsLive(info); // Must checkpoint w/ deleter, because we just - // created created new _X_N.del file. + // created new _X_N.del file. deleter.checkpoint(segmentInfos, false); + + // we wrote liveDocs, reopen the reader + rld.reopenReader(IOContext.READ); } } } @@ -1532,6 +1536,40 @@ } } + /** + * Updates a document's NumericDocValue for field to the value + * value. This method can be used to 'unset' a document's value + * by passing {@code null} as the new value. Also, you can only update fields + * that already exist in the index, not add new fields through this method. + * + *

+ * NOTE: if this method hits an OutOfMemoryError you should immediately + * close the writer. See above for details. + *

+ * + * @param term + * the term to identify the document(s) to be updated + * @param field + * field name of the NumericDocValues field + * @param value + * new value for the field + * @throws CorruptIndexException + * if the index is corrupt + * @throws IOException + * if there is a low-level IO error + */ + public void updateNumericDocValue(Term term, String field, Long value) throws IOException { + ensureOpen(); + if (!globalFieldNumberMap.contains(field, DocValuesType.NUMERIC)) { + throw new IllegalArgumentException("can only update existing numeric-docvalues fields!"); + } + try { + docWriter.updateNumericDocValue(term, field, value); + } catch (OutOfMemoryError oom) { + handleOOM(oom, "updateNumericDocValue"); + } + } + // for test purpose final synchronized int getSegmentCount(){ return segmentInfos.size(); @@ -2512,7 +2550,7 @@ } } - SegmentInfoPerCommit infoPerCommit = new SegmentInfoPerCommit(info, 0, -1L); + SegmentInfoPerCommit infoPerCommit = new SegmentInfoPerCommit(info, 0, -1L, -1L); info.setFiles(new HashSet(trackingDir.getCreatedFiles())); trackingDir.getCreatedFiles().clear(); @@ -2599,7 +2637,7 @@ SegmentInfo newInfo = new SegmentInfo(directory, info.info.getVersion(), segName, info.info.getDocCount(), info.info.getUseCompoundFile(), info.info.getCodec(), info.info.getDiagnostics(), attributes); - SegmentInfoPerCommit newInfoPerCommit = new SegmentInfoPerCommit(newInfo, info.getDelCount(), info.getDelGen()); + SegmentInfoPerCommit newInfoPerCommit = new SegmentInfoPerCommit(newInfo, info.getDelCount(), info.getDelGen(), info.getDocValuesGen()); Set segFiles = new HashSet(); @@ -3011,7 +3049,7 @@ flushDeletesCount.incrementAndGet(); final BufferedDeletesStream.ApplyDeletesResult result; result = bufferedDeletesStream.applyDeletes(readerPool, segmentInfos.asList()); - if (result.anyDeletes) { + if (result.anyDeletes || result.anyNumericDVUpdates) { checkpoint(); } if (!keepFullyDeletedSegments && result.allDeleted != null) { @@ -3072,6 +3110,7 @@ * saves the resulting deletes file (incrementing the * delete generation for merge.info). If no deletes were * flushed, no new deletes file is saved. */ + // nocommit (RENAME) to commitMergedDeletesAndUpdates and update jdocs synchronized private ReadersAndLiveDocs commitMergedDeletes(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException { assert testPoint("startCommitMergeDeletes"); @@ -3088,20 +3127,30 @@ long minGen = Long.MAX_VALUE; // Lazy init (only when we find a delete to carry over): - ReadersAndLiveDocs mergedDeletes = null; + ReadersAndLiveDocs mergedDeletes = null; // nocommit (RENAME) to mergedDeletesAndUpdates MergePolicy.DocMap docMap = null; - for(int i=0; i < sourceSegments.size(); i++) { + for (int i = 0; i < sourceSegments.size(); i++) { SegmentInfoPerCommit info = sourceSegments.get(i); minGen = Math.min(info.getBufferedDeletesGen(), minGen); final int docCount = info.info.getDocCount(); final Bits prevLiveDocs = merge.readers.get(i).getLiveDocs(); - final Bits currentLiveDocs; final ReadersAndLiveDocs rld = readerPool.get(info, false); // We hold a ref so it should still be in the pool: assert rld != null: "seg=" + info.info.name; - currentLiveDocs = rld.getLiveDocs(); + final Bits currentLiveDocs = rld.getLiveDocs(); + if (rld.hasMergingUpdates()) { + if (mergedDeletes == null) { + mergedDeletes = readerPool.get(merge.info, true); + mergedDeletes.initWritableLiveDocs(); + // don't need that for updates, but perhaps there are deletes too + docMap = merge.getDocMap(mergeState); + assert docMap.isConsistent(merge.info.info.getDocCount()); + } + rld.copyMergingUpdates(mergedDeletes, mergeState.docMaps[i], mergeState.docBase[i]); + } + if (prevLiveDocs != null) { // If we had deletions on starting the merge we must @@ -3123,11 +3172,10 @@ // If so, we must carefully merge the liveDocs one // doc at a time: if (currentLiveDocs != prevLiveDocs) { - // This means this segment received new deletes // since we started the merge, so we // must merge them: - for(int j=0;j BD final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream.applyDeletes(readerPool, merge.segments); - if (result.anyDeletes) { + if (result.anyDeletes || result.anyNumericDVUpdates) { checkpoint(); } @@ -3556,7 +3604,7 @@ details.put("mergeMaxNumSegments", "" + merge.maxNumSegments); details.put("mergeFactor", Integer.toString(merge.segments.size())); setDiagnostics(si, SOURCE_MERGE, details); - merge.setInfo(new SegmentInfoPerCommit(si, 0, -1L)); + merge.setInfo(new SegmentInfoPerCommit(si, 0, -1L, -1L)); // Lock order: IW -> BD bufferedDeletesStream.prune(segmentInfos); @@ -3598,7 +3646,7 @@ // exception inside mergeInit if (merge.registerDone) { final List sourceSegments = merge.segments; - for(SegmentInfoPerCommit info : sourceSegments) { + for (SegmentInfoPerCommit info : sourceSegments) { mergingSegments.remove(info); } merge.registerDone = false; @@ -3623,6 +3671,7 @@ if (drop) { rld.dropChanges(); } + rld.setMerging(false); rld.release(sr); readerPool.release(rld); if (drop) { @@ -3680,9 +3729,13 @@ // Hold onto the "live" reader; we will use this to // commit merged deletes final ReadersAndLiveDocs rld = readerPool.get(info, true); - SegmentReader reader = rld.getReader(context); + SegmentReader reader = rld.getReader(true, context); assert reader != null; + // Notify that we are merging, so that we can later copy the updates + // that were received while merging to the merged segment. + rld.setMerging(true); + // Carefully pull the most recent live docs: final Bits liveDocs; final int delCount; @@ -3877,7 +3930,7 @@ final IndexReaderWarmer mergedSegmentWarmer = config.getMergedSegmentWarmer(); if (poolReaders && mergedSegmentWarmer != null && merge.info.info.getDocCount() != 0) { final ReadersAndLiveDocs rld = readerPool.get(merge.info, true); - final SegmentReader sr = rld.getReader(IOContext.READ); + final SegmentReader sr = rld.getReader(true, IOContext.READ); try { mergedSegmentWarmer.warm(sr); } finally { Index: lucene/core/src/java/org/apache/lucene/index/NumericUpdate.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/NumericUpdate.java (revision 0) +++ lucene/core/src/java/org/apache/lucene/index/NumericUpdate.java (working copy) @@ -0,0 +1,49 @@ +package org.apache.lucene.index; + +import org.apache.lucene.document.NumericDocValuesField; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** An in-place update to a numeric docvalues field */ +final class NumericUpdate { + + static final Long MISSING = new Long(0); + + Term term; + String field; + Long value; + int docIDUpto = -1; // unassigned until applied, and confusing that it's here, when it's just used in BufferedDeletes... + + /** + * Constructor. + * + * @param term the {@link Term} which determines the documents that will be updated + * @param field the {@link NumericDocValuesField} to update + * @param value the updated value + */ + NumericUpdate(Term term, String field, Long value) { + this.term = term; + this.field = field; + this.value = value == null ? MISSING : value; + } + + @Override + public String toString() { + return "term=" + term + ",field=" + field + ",value=" + value; + } +} Index: lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java (revision 1519637) +++ lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java (working copy) @@ -18,19 +18,30 @@ */ import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.DocValuesConsumer; +import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.LiveDocsFormat; +import org.apache.lucene.index.FieldInfo.DocValuesType; +import org.apache.lucene.index.MergeState.DocMap; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.TrackingDirectoryWrapper; import org.apache.lucene.util.Bits; +import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.MutableBits; // Used by IndexWriter to hold open SegmentReaders (for -// searching or merging), plus pending deletes, +// searching or merging), plus pending deletes, and updates // for a given segment -class ReadersAndLiveDocs { +class ReadersAndLiveDocs { // nocommit (RENAME) to ReaderAndUpdates? // Not final because we replace (clone) when we need to // change it and it's been shared: public final SegmentInfoPerCommit info; @@ -43,13 +54,16 @@ // Set once (null, and then maybe set, and never set again): private SegmentReader reader; - // Holds the current shared (readable and writable - // liveDocs). This is null when there are no deleted + // Holds the current shared (readable and writable) + // liveDocs. This is null when there are no deleted // docs, and it's copy-on-write (cloned whenever we need // to change it but it's been shared to an external NRT // reader). private Bits liveDocs; + // Holds the numeric DocValues updates. + private final Map> numericUpdates = new HashMap>(); + // How many further deletions we've done against // liveDocs vs when we loaded it or last wrote it: private int pendingDeleteCount; @@ -56,12 +70,22 @@ // True if the current liveDocs is referenced by an // external NRT reader: - private boolean shared; + private boolean liveDocsShared; + // Indicates whether this segment is currently being merged. While a segment + // is merging, all field updates are also registered in the mergingUpdates + // map. Also, calls to writeLiveDocs merge the updates with mergingUpdates. + // That way, when the segment is done merging, IndexWriter can apply the + // updates on the merged segment too. + private boolean isMerging = false; + + // Holds any updates that come through while this segment was being merged. + private final Map> mergingUpdates = new HashMap>(); + public ReadersAndLiveDocs(IndexWriter writer, SegmentInfoPerCommit info) { this.info = info; this.writer = writer; - shared = true; + liveDocsShared = true; } public void incRef() { @@ -83,6 +107,14 @@ public synchronized int getPendingDeleteCount() { return pendingDeleteCount; } + + public synchronized boolean hasFieldUpdates() { + return numericUpdates.size() > 0; + } + + public synchronized boolean hasMergingUpdates() { + return mergingUpdates.size() > 0; + } // Call only from assert! public synchronized boolean verifyDocCounts() { @@ -102,35 +134,89 @@ return true; } - // Get reader for searching/deleting - public synchronized SegmentReader getReader(IOContext context) throws IOException { - //System.out.println(" livedocs=" + rld.liveDocs); - - if (reader == null) { - // We steal returned ref: - reader = new SegmentReader(info, context); - if (liveDocs == null) { - liveDocs = reader.getLiveDocs(); + public synchronized void reopenReader(IOContext context) throws IOException { + if (reader != null) { + // nocommit when DVProducers move from SCR to SR, use a ctor which shares from previous SR + SegmentReader newReader = new SegmentReader(info, context); + boolean reopened = false; + try { + reader.decRef(); + reader = newReader; + if (liveDocs == null) { + liveDocs = reader.getLiveDocs(); + } + reopened = true; + } finally { + if (!reopened) { + newReader.decRef(); + } } - //System.out.println("ADD seg=" + rld.info + " isMerge=" + isMerge + " " + readerMap.size() + " in pool"); - //System.out.println(Thread.currentThread().getName() + ": getReader seg=" + info.name); } - - // Ref for caller - reader.incRef(); - return reader; } - + + /** Returns a {@link SegmentReader} while applying field updates if requested. */ + public SegmentReader getReader(boolean applyFieldUpdates, IOContext context) throws IOException { + boolean checkpoint = false; + try { + // don't synchronize the entire method because we cannot call + // writer.checkpoint() while holding the RLD lock, otherwise we might hit + // a deadlock w/ e.g. a concurrent merging thread. + synchronized (this) { + if (reader == null) { + if (applyFieldUpdates && hasFieldUpdates()) { + if (writeLiveDocs(info.info.dir)) { + checkpoint = true; + } + } + // We steal returned ref: + reader = new SegmentReader(info, context); + if (liveDocs == null) { + liveDocs = reader.getLiveDocs(); + } + } else if (applyFieldUpdates && hasFieldUpdates()) { + // enroll a new reader with the applied updates + if (writeLiveDocs(info.info.dir)) { + checkpoint = true; + } + reopenReader(context); + } + + // Ref for caller + reader.incRef(); + return reader; + } + } finally { + if (checkpoint) { + writer.checkpoint(); + } + } + } + public synchronized void release(SegmentReader sr) throws IOException { assert info == sr.getSegmentInfo(); sr.decRef(); } + /** + * Updates the numeric doc value of {@code docID} under {@code field} to the + * given {@code value}. + */ + public synchronized void updateNumericDocValue(String field, int docID, Long value) { + assert Thread.holdsLock(writer); + assert docID >= 0 && docID < reader.maxDoc() : "out of bounds: docid=" + docID + " maxDoc=" + reader.maxDoc() + " seg=" + info.info.name + " docCount=" + info.info.getDocCount(); + Map updates = numericUpdates.get(field); + if (updates == null) { + updates = new HashMap(); + numericUpdates.put(field, updates); + } + updates.put(docID, value); + } + public synchronized boolean delete(int docID) { assert liveDocs != null; assert Thread.holdsLock(writer); assert docID >= 0 && docID < liveDocs.length() : "out of bounds: docid=" + docID + " liveDocsLength=" + liveDocs.length() + " seg=" + info.info.name + " docCount=" + info.info.getDocCount(); - assert !shared; + assert !liveDocsShared; final boolean didDelete = liveDocs.get(docID); if (didDelete) { ((MutableBits) liveDocs).clear(docID); @@ -162,11 +248,9 @@ * it when you're done (ie, do not call release()). */ public synchronized SegmentReader getReadOnlyClone(IOContext context) throws IOException { - if (reader == null) { - getReader(context).decRef(); - assert reader != null; - } - shared = true; + getReader(true, context).decRef(); // make sure we enroll a new reader if there are field updates + assert reader != null; + liveDocsShared = true; if (liveDocs != null) { return new SegmentReader(reader.getSegmentInfo(), reader.core, liveDocs, info.info.getDocCount() - info.getDelCount() - pendingDeleteCount); } else { @@ -180,7 +264,7 @@ assert Thread.holdsLock(writer); assert info.info.getDocCount() > 0; //System.out.println("initWritableLivedocs seg=" + info + " liveDocs=" + liveDocs + " shared=" + shared); - if (shared) { + if (liveDocsShared) { // Copy on write: this means we've cloned a // SegmentReader sharing the current liveDocs // instance; must now make a private clone so we can @@ -192,9 +276,7 @@ } else { liveDocs = liveDocsFormat.newLiveDocs(liveDocs); } - shared = false; - } else { - assert liveDocs != null; + liveDocsShared = false; } } @@ -206,7 +288,7 @@ public synchronized Bits getReadOnlyLiveDocs() { //System.out.println("getROLiveDocs seg=" + info); assert Thread.holdsLock(writer); - shared = true; + liveDocsShared = true; //if (liveDocs != null) { //System.out.println(" liveCount=" + liveDocs.count()); //} @@ -227,56 +309,248 @@ // Commit live docs to the directory (writes new // _X_N.del files); returns true if it wrote the file // and false if there were no new deletes to write: + // nocommit (RENAME) to applyDeletesAndUpdates public synchronized boolean writeLiveDocs(Directory dir) throws IOException { //System.out.println("rld.writeLiveDocs seg=" + info + " pendingDelCount=" + pendingDeleteCount); - if (pendingDeleteCount != 0) { - // We have new deletes + final boolean hasFieldUpdates = hasFieldUpdates(); + if (pendingDeleteCount == 0 && !hasFieldUpdates) { + return false; + } + + // We have new deletes or updates + if (pendingDeleteCount > 0) { assert liveDocs.length() == info.info.getDocCount(); + } + + // Do this so we can delete any created files on + // exception; this saves all codecs from having to do + // it: + TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir); + + // We can write directly to the actual name (vs to a + // .tmp & renaming it) because the file is not live + // until segments file is written: + boolean success = false; + try { + Codec codec = info.info.getCodec(); + if (pendingDeleteCount > 0) { + codec.liveDocsFormat().writeLiveDocs((MutableBits)liveDocs, trackingDir, info, pendingDeleteCount, IOContext.DEFAULT); + } + + // apply numeric updates if there are any + if (hasFieldUpdates) { + // reader could be null e.g. for a just merged segment (from + // IndexWriter.commitMergedDeletes). + final SegmentReader reader = this.reader == null ? new SegmentReader(info, IOContext.READONCE) : this.reader; + try { + // clone FieldInfos so that we can update their numericUpdatesGen + // separately from the reader's infos and write them to a new + // fieldInfos_gen file + FieldInfos.Builder builder = new FieldInfos.Builder(writer.globalFieldNumberMap); + // cannot use builder.add(reader.getFieldInfos()) because it does not clone FI.attributes + for (FieldInfo fi : reader.getFieldInfos()) { + FieldInfo clone = builder.add(fi); + // copy the stuff FieldInfos.Builder doesn't copy + if (fi.attributes() != null) { + for (Entry e : fi.attributes().entrySet()) { + clone.putAttribute(e.getKey(), e.getValue()); + } + } + } + // create new fields or update existing ones to have NumericDV type +// for (String f : numericUpdates.keySet()) { +// builder.addOrUpdate(f, NumericDocValuesField.TYPE); +// } + + final FieldInfos fieldInfos = builder.finish(); + final long nextDocValuesGen = info.getNextDocValuesGen(); + final String segmentSuffix = Long.toString(nextDocValuesGen, Character.MAX_RADIX); + final SegmentWriteState state = new SegmentWriteState(null, trackingDir, info.info, fieldInfos, null, IOContext.DEFAULT, segmentSuffix, true); + final DocValuesFormat docValuesFormat = codec.docValuesFormat(); + final DocValuesConsumer fieldsConsumer = docValuesFormat.fieldsConsumer(state); + boolean fieldsConsumerSuccess = false; + try { + for (Entry> e : numericUpdates.entrySet()) { + final String field = e.getKey(); + final Map updates = e.getValue(); + final FieldInfo fieldInfo = fieldInfos.fieldInfo(field); - // Do this so we can delete any created files on - // exception; this saves all codecs from having to do - // it: - TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir); + if (fieldInfo == null || fieldInfo.getDocValuesType() != DocValuesType.NUMERIC) { + throw new UnsupportedOperationException( + "cannot update docvalues in a segment with no docvalues field: segment=" + info + ", field=" + field); + } +// assert fieldInfo != null; - // We can write directly to the actual name (vs to a - // .tmp & renaming it) because the file is not live - // until segments file is written: - boolean success = false; - try { - info.info.getCodec().liveDocsFormat().writeLiveDocs((MutableBits)liveDocs, trackingDir, info, pendingDeleteCount, IOContext.DEFAULT); - success = true; - } finally { - if (!success) { - // Advance only the nextWriteDelGen so that a 2nd - // attempt to write will write to a new file - info.advanceNextWriteDelGen(); + info.setDocValuesGen(fieldInfo.number, nextDocValuesGen); + + // write the numeric updates to a new gen'd docvalues file + fieldsConsumer.addNumericField(fieldInfo, new Iterable() { + @SuppressWarnings("synthetic-access") + final NumericDocValues currentValues = reader.getNumericDocValues(field); + @Override + public Iterator iterator() { + return new Iterator() { - // Delete any partially created file(s): - for(String fileName : trackingDir.getCreatedFiles()) { - try { - dir.deleteFile(fileName); - } catch (Throwable t) { - // Ignore so we throw only the first exc + @SuppressWarnings("synthetic-access") + final int maxDoc = reader.maxDoc(); + int curDoc = -1; + + @Override + public boolean hasNext() { + return curDoc < maxDoc - 1; + } + + @Override + public Number next() { + if (++curDoc >= maxDoc) { + throw new NoSuchElementException("no more documents to return values for"); + } + Long updatedValue = updates.get(curDoc); + if (updatedValue == null) { + updatedValue = Long.valueOf(currentValues.get(curDoc)); + } else if (updatedValue == NumericUpdate.MISSING) { + updatedValue = null; + } + return updatedValue; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("this iterator does not support removing elements"); + } + + }; + } + }); } + fieldsConsumerSuccess = true; + } finally { + if (fieldsConsumerSuccess) { + fieldsConsumer.close(); + } else { + IOUtils.closeWhileHandlingException(fieldsConsumer); + } } + } finally { + if (reader != this.reader) { + reader.close(); + } } } - - // If we hit an exc in the line above (eg disk full) - // then info's delGen remains pointing to the previous - // (successfully written) del docs: + success = true; + } finally { + if (!success) { + // Advance only the nextWriteDelGen so that a 2nd + // attempt to write will write to a new file + if (pendingDeleteCount > 0) { + info.advanceNextWriteDelGen(); + } + + // Advance only the nextWriteDocValuesGen so that a 2nd + // attempt to write will write to a new file + if (hasFieldUpdates) { + info.advanceNextWriteDocValuesGen(); + } + + // Delete any partially created file(s): + for (String fileName : trackingDir.getCreatedFiles()) { + try { + dir.deleteFile(fileName); + } catch (Throwable t) { + // Ignore so we throw only the first exc + } + } + } + } + + // If we hit an exc in the line above (eg disk full) + // then info's delGen remains pointing to the previous + // (successfully written) del docs: + if (pendingDeleteCount > 0) { info.advanceDelGen(); info.setDelCount(info.getDelCount() + pendingDeleteCount); - pendingDeleteCount = 0; - return true; - } else { - return false; } + + if (hasFieldUpdates) { + info.advanceDocValuesGen(); + // copy all the updates to mergingUpdates, so they can later be applied to the merged segment + if (isMerging) { + copyUpdatesToMerging(); + } + numericUpdates.clear(); + } + + info.addUpdatesFiles(trackingDir.getCreatedFiles()); + + return true; } + private void copyUpdatesToMerging() { + for (Entry> e : numericUpdates.entrySet()) { + String field = e.getKey(); + Map merging = mergingUpdates.get(field); + if (merging == null) { + mergingUpdates.put(field, new HashMap(e.getValue())); + } else { + merging.putAll(e.getValue()); + } + } + } + + /** + * Indicates whether this segment is currently being merged. Call this just + * before the segment is being merged with {@code true} and when the merge has + * finished and all updates have been applied to the merged segment, call this + * with {@code false}. + */ + public synchronized void setMerging(boolean isMerging) { + this.isMerging = isMerging; + if (!isMerging) { + mergingUpdates.clear(); + } + } + + /** + * Copies the updates that were accumulated while the segment was merging, + * using the given {@link DocMap} and {@code docBase} to map the documents. + */ + public synchronized void copyMergingUpdates(ReadersAndLiveDocs merged, DocMap docMap, int docBase) { + if (hasFieldUpdates()) { + copyUpdatesToMerging(); + } + for (Entry> e : mergingUpdates.entrySet()) { + Map mergedUpdates = merged.numericUpdates.get(e.getKey()); + if (mergedUpdates == null) { + mergedUpdates = new HashMap(); + merged.numericUpdates.put(e.getKey(), mergedUpdates); + } + for (Entry update : e.getValue().entrySet()) { + int doc = update.getKey().intValue(); + doc = docMap.get(doc); + if (doc != -1) { // doc not deleted + doc += docBase; +// System.out.println("updating doc " + doc + " to " + update.getValue()); + mergedUpdates.put(doc, update.getValue()); + } + } + } +// System.out.println("merged.numericUpdates=" + merged.numericUpdates); + mergingUpdates.clear(); + } + @Override public String toString() { - return "ReadersAndLiveDocs(seg=" + info + " pendingDeleteCount=" + pendingDeleteCount + " shared=" + shared + ")"; + StringBuilder sb = new StringBuilder(); + sb.append("ReadersAndLiveDocs(seg=").append(info); + sb.append(" pendingDeleteCount=").append(pendingDeleteCount); + sb.append(" liveDocsShared=").append(liveDocsShared); + int pendingUpdatesCount = 0; + for (Entry> e : numericUpdates.entrySet()) { + pendingUpdatesCount += e.getValue().size(); + } + sb.append(" pendingUpdatesCount=").append(pendingUpdatesCount); + return sb.toString(); } + } Index: lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java (revision 1519637) +++ lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java (working copy) @@ -18,17 +18,21 @@ */ import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.DocValuesFormat; +import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.codecs.FieldsProducer; import org.apache.lucene.codecs.PostingsFormat; -import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.codecs.StoredFieldsReader; import org.apache.lucene.codecs.TermVectorsReader; import org.apache.lucene.index.FieldInfo.DocValuesType; @@ -44,11 +48,11 @@ * SegmentReader is cloned or reopened */ final class SegmentCoreReaders { - // Counts how many other reader share the core objects + // Counts how many other readers share the core objects // (freqStream, proxStream, tis, etc.) of this reader; // when coreRef drops to 0, these core objects may be // closed. A given instance of SegmentReader may be - // closed, even those it shares core objects with other + // closed, even though it shares core objects with other // SegmentReaders: private final AtomicInteger ref = new AtomicInteger(1); @@ -55,7 +59,8 @@ final FieldInfos fieldInfos; final FieldsProducer fields; - final DocValuesProducer dvProducer; + final Map dvProducers = new HashMap(); + final Map genDVProducers = new HashMap(); final DocValuesProducer normsProducer; private final SegmentReader owner; @@ -120,8 +125,9 @@ cfsReader = null; cfsDir = dir; } + fieldInfos = codec.fieldInfosFormat().getFieldInfosReader().read(cfsDir, si.info.name, IOContext.READONCE); - + final PostingsFormat format = codec.postingsFormat(); final SegmentReadState segmentReadState = new SegmentReadState(cfsDir, si.info, fieldInfos, context); // Ask codec for its Fields @@ -131,11 +137,54 @@ // TODO: since we don't write any norms file if there are no norms, // kinda jaky to assume the codec handles the case of no norms file at all gracefully?! + // nocommit SegCoreReader represents the stuff that doesn't change between commits. + // Currently ReaderAndLiveDocs.getReadOnlyClone creates a new + // SegCoreReader, which means DVProdcers aren't shared, which is very + // costly. if (fieldInfos.hasDocValues()) { - dvProducer = codec.docValuesFormat().fieldsProducer(segmentReadState); - assert dvProducer != null; - } else { - dvProducer = null; + final DocValuesFormat dvFormat = codec.docValuesFormat(); + // initialize the per generation numericDVProducers and put the correct + // DVProducer for each field + // note that while DVF.fieldsProducer takes a gen, we group all the + // fields with the same gen so that a Codec "sees" only those fields. + // otherwise, PerFieldDVF loads all given FieldInfos, irrespective of + // their dvUpdateGen compared to the asked gen. + final Map> genInfos = new HashMap>(); + for (FieldInfo fi : fieldInfos) { + if (fi.getDocValuesType() == null) { + continue; + } + long gen = si.getDocValuesGen(fi.number); + List infos = genInfos.get(gen); + if (infos == null) { + infos = new ArrayList(); + genInfos.put(gen, infos); + } + infos.add(fi); + } + + for (Entry> e : genInfos.entrySet()) { + Long gen = e.getKey(); + List infos = e.getValue(); + DocValuesProducer dvp = genDVProducers.get(gen); + if (dvp == null) { + // set SegmentReadState to list only the fields that are relevant to that gen + Directory dvDir = cfsDir; + String segmentSuffix = ""; + if (gen.longValue() != -1) { + dvDir = si.info.dir; // gen'd files are written outside CFS, so use SegInfo directory + segmentSuffix = Long.toString(gen.longValue(), Character.MAX_RADIX); + } + + SegmentReadState srs = new SegmentReadState(dvDir, si.info, new FieldInfos(infos.toArray(new FieldInfo[infos.size()])), context, segmentSuffix); + dvp = dvFormat.fieldsProducer(srs); + assert dvp != null; + genDVProducers.put(gen, dvp); + } + for (FieldInfo fi : infos) { + dvProducers.put(fi.name, dvp); + } + } } if (fieldInfos.hasNorms()) { @@ -167,6 +216,10 @@ this.owner = owner; } + int getRefCount() { + return ref.get(); + } + void incRef() { ref.incrementAndGet(); } @@ -186,6 +239,7 @@ return null; } + DocValuesProducer dvProducer = dvProducers.get(field); assert dvProducer != null; Map dvFields = docValuesLocal.get(); @@ -214,6 +268,7 @@ return null; } + DocValuesProducer dvProducer = dvProducers.get(field); assert dvProducer != null; Map dvFields = docValuesLocal.get(); @@ -242,6 +297,7 @@ return null; } + DocValuesProducer dvProducer = dvProducers.get(field); assert dvProducer != null; Map dvFields = docValuesLocal.get(); @@ -270,6 +326,7 @@ return null; } + DocValuesProducer dvProducer = dvProducers.get(field); assert dvProducer != null; Map dvFields = docValuesLocal.get(); @@ -294,6 +351,7 @@ return null; } + DocValuesProducer dvProducer = dvProducers.get(field); assert dvProducer != null; Map dvFields = docsWithFieldLocal.get(); @@ -332,8 +390,13 @@ void decRef() throws IOException { if (ref.decrementAndGet() == 0) { - IOUtils.close(termVectorsLocal, fieldsReaderLocal, docValuesLocal, normsLocal, docsWithFieldLocal, fields, - dvProducer, termVectorsReaderOrig, fieldsReaderOrig, cfsReader, normsProducer); +// System.err.println("--- closing core readers"); + try { + IOUtils.close(termVectorsLocal, fieldsReaderLocal, docValuesLocal, normsLocal, fields, + termVectorsReaderOrig, fieldsReaderOrig, cfsReader, normsProducer); + } finally { + IOUtils.close(genDVProducers.values()); + } notifyCoreClosedListeners(); } } Index: lucene/core/src/java/org/apache/lucene/index/SegmentInfo.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/SegmentInfo.java (revision 1519637) +++ lucene/core/src/java/org/apache/lucene/index/SegmentInfo.java (working copy) @@ -119,7 +119,7 @@ public void setCodec(Codec codec) { assert this.codec == null; if (codec == null) { - throw new IllegalArgumentException("segmentCodecs must be non-null"); + throw new IllegalArgumentException("codec must be non-null"); } this.codec = codec; } @@ -170,7 +170,6 @@ * left off when there are no deletions).

*/ public String toString(Directory dir, int delCount) { - StringBuilder s = new StringBuilder(); s.append(name).append('(').append(version == null ? "?" : version).append(')').append(':'); char cfs = getUseCompoundFile() ? 'c' : 'C'; Index: lucene/core/src/java/org/apache/lucene/index/SegmentInfoPerCommit.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/SegmentInfoPerCommit.java (revision 1519637) +++ lucene/core/src/java/org/apache/lucene/index/SegmentInfoPerCommit.java (working copy) @@ -19,7 +19,10 @@ import java.io.IOException; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; +import java.util.Set; import org.apache.lucene.store.Directory; @@ -27,9 +30,8 @@ * fields. * * @lucene.experimental */ - -public class SegmentInfoPerCommit { - +public class SegmentInfoPerCommit { // nocommit (RENAME) to SegmentCommit? + /** The {@link SegmentInfo} that we wrap. */ public final SegmentInfo info; @@ -44,15 +46,34 @@ // attempt to write: private long nextWriteDelGen; + // holds field.number to docValuesGen mapping + private final Map fieldDocValuesGens = new HashMap(); + + // Generation number of the docValues (-1 if there are no field updates) + private long docValuesGen; + + // Normally 1 + docValuesGen, unless an exception was hit on last attempt to + // write + private long nextWriteDocValuesGen; + + // Tracks the files with field updates + private Set updatesFiles = new HashSet(); + private volatile long sizeInBytes = -1; - /** Sole constructor. - * @param info {@link SegmentInfo} that we wrap - * @param delCount number of deleted documents in this segment - * @param delGen deletion generation number (used to name - deletion files) + /** + * Sole constructor. + * + * @param info + * {@link SegmentInfo} that we wrap + * @param delCount + * number of deleted documents in this segment + * @param delGen + * deletion generation number (used to name deletion files) + * @param docValuesGen + * doc-values generation number (used to name docvalues files) **/ - public SegmentInfoPerCommit(SegmentInfo info, int delCount, long delGen) { + public SegmentInfoPerCommit(SegmentInfo info, int delCount, long delGen, long docValuesGen) { this.info = info; this.delCount = delCount; this.delGen = delGen; @@ -61,8 +82,23 @@ } else { nextWriteDelGen = delGen+1; } + + this.docValuesGen = docValuesGen; + if (docValuesGen == -1) { + nextWriteDocValuesGen = 1; + } else { + nextWriteDocValuesGen = docValuesGen + 1; + } } + public Set getUpdatesFiles() { + return new HashSet(updatesFiles); + } + + public void addUpdatesFiles(Set files) { + updatesFiles.addAll(files); + } + /** Called when we succeed in writing deletes */ void advanceDelGen() { delGen = nextWriteDelGen; @@ -76,6 +112,21 @@ void advanceNextWriteDelGen() { nextWriteDelGen++; } + + /** Called when we succeed in writing docvalues updates */ + void advanceDocValuesGen() { + docValuesGen = nextWriteDocValuesGen; + nextWriteDocValuesGen = docValuesGen + 1; + sizeInBytes = -1; + } + + /** + * Called if there was an exception while writing docvalues updates, so that + * we don't try to write to the same file more than once. + */ + void advanceNextWriteDocValuesGen() { + nextWriteDocValuesGen++; + } /** Returns total size in bytes of all files for this * segment. */ @@ -97,8 +148,13 @@ Collection files = new HashSet(info.files()); // Must separately add any live docs files: + // nocommit why do we do that, vs relying on TrackingDir.getCreatedFiles(), + // like we do for updates? info.getCodec().liveDocsFormat().files(this, files); + // Must separately add any field updates files + files.addAll(updatesFiles); + return files; } @@ -115,6 +171,7 @@ sizeInBytes = -1; } + // nocommit no one calls this, remove? void clearDelGen() { delGen = -1; sizeInBytes = -1; @@ -124,6 +181,7 @@ * Sets the generation number of the live docs file. * @see #getDelGen() */ + // nocommit no one calls this method, why do we have it? and if we need it, do we need one for docValuesGen too? public void setDelGen(long delGen) { this.delGen = delGen; sizeInBytes = -1; @@ -135,7 +193,43 @@ return delGen != -1; } + /** Returns true if there are any field updates for the segment in this commit. */ + public boolean hasFieldUpdates() { + return docValuesGen != -1; + } + + /** Returns the next available generation number of the docvalues files. */ + public long getNextDocValuesGen() { + return nextWriteDocValuesGen; + } + /** + * Returns the docvalues generation of this field, or -1 if there are + * no updates to it. + */ + public long getDocValuesGen(int fieldNumber) { + Long gen = fieldDocValuesGens.get(fieldNumber); + return gen == null ? -1 : gen.longValue(); + } + + /** Sets the docvalues generation for this field. */ + public void setDocValuesGen(int fieldNumber, long gen) { + fieldDocValuesGens.put(fieldNumber, gen); + } + + public Map getFieldDocValuesGens() { + return fieldDocValuesGens; + } + + /** + * Returns the generation number of the field infos file or -1 if there are no + * field updates yet. + */ + public long getDocValuesGen() { + return docValuesGen; + } + + /** * Returns the next available generation number * of the live docs file. */ @@ -174,17 +268,25 @@ if (delGen != -1) { s += ":delGen=" + delGen; } + if (docValuesGen != -1) { + s += ":docValuesGen=" + docValuesGen; + } return s; } @Override public SegmentInfoPerCommit clone() { - SegmentInfoPerCommit other = new SegmentInfoPerCommit(info, delCount, delGen); + SegmentInfoPerCommit other = new SegmentInfoPerCommit(info, delCount, delGen, docValuesGen); // Not clear that we need to carry over nextWriteDelGen // (i.e. do we ever clone after a failed write and // before the next successful write?), but just do it to // be safe: other.nextWriteDelGen = nextWriteDelGen; + other.nextWriteDocValuesGen = nextWriteDocValuesGen; + + other.updatesFiles.addAll(updatesFiles); + + other.fieldDocValuesGens.putAll(fieldDocValuesGens); return other; } } Index: lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java (revision 1519637) +++ lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java (working copy) @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.lucene.codecs.Codec; @@ -35,7 +36,7 @@ import org.apache.lucene.codecs.LiveDocsFormat; import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.ChecksumIndexOutput; -import org.apache.lucene.store.DataOutput; // javadocs +import org.apache.lucene.store.DataOutput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; @@ -42,7 +43,6 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.NoSuchDirectoryException; import org.apache.lucene.util.IOUtils; -import org.apache.lucene.util.ThreadInterruptedException; /** * A collection of segmentInfo objects with methods for operating on @@ -111,11 +111,12 @@ */ public final class SegmentInfos implements Cloneable, Iterable { - /** - * The file format version for the segments_N codec header - */ + /** The file format version for the segments_N codec header, up to 4.4. */ public static final int VERSION_40 = 0; + /** The file format version for the segments_N codec header, since 4.5+. */ + public static final int VERSION_45 = 1; + /** Used for the segments.gen file only! * Whenever you add a new format, make it 1 smaller (negative version logic)! */ public static final int FORMAT_SEGMENTS_GEN_CURRENT = -2; @@ -319,7 +320,7 @@ throw new IndexFormatTooOldException(input, magic, CodecUtil.CODEC_MAGIC, CodecUtil.CODEC_MAGIC); } // 4.0+ - CodecUtil.checkHeaderNoMagic(input, "segments", VERSION_40, VERSION_40); + int format = CodecUtil.checkHeaderNoMagic(input, "segments", VERSION_40, VERSION_45); version = input.readLong(); counter = input.readInt(); int numSegments = input.readInt(); @@ -326,7 +327,7 @@ if (numSegments < 0) { throw new CorruptIndexException("invalid segment count: " + numSegments + " (resource: " + input + ")"); } - for(int seg=0;seg= VERSION_45) { + docValuesGen = input.readLong(); + } + SegmentInfoPerCommit siPerCommit = new SegmentInfoPerCommit(info, delCount, delGen, docValuesGen); + if (format >= VERSION_45) { + int numUpdates = input.readInt(); + for (int i = 0; i < numUpdates; i++) { + siPerCommit.setDocValuesGen(input.readInt(), input.readLong()); + } + siPerCommit.addUpdatesFiles(input.readStringSet()); + } + add(siPerCommit); } userData = input.readStringStringMap(); @@ -395,7 +408,7 @@ try { segnOutput = new ChecksumIndexOutput(directory.createOutput(segmentFileName, IOContext.DEFAULT)); - CodecUtil.writeHeader(segnOutput, "segments", VERSION_40); + CodecUtil.writeHeader(segnOutput, "segments", VERSION_45); segnOutput.writeLong(version); segnOutput.writeInt(counter); // write counter segnOutput.writeInt(size()); // write infos @@ -405,6 +418,14 @@ segnOutput.writeString(si.getCodec().getName()); segnOutput.writeLong(siPerCommit.getDelGen()); segnOutput.writeInt(siPerCommit.getDelCount()); + segnOutput.writeLong(siPerCommit.getDocValuesGen()); + Map docValuesUpdatesGen = siPerCommit.getFieldDocValuesGens(); + segnOutput.writeInt(docValuesUpdatesGen.size()); + for (Entry e : docValuesUpdatesGen.entrySet()) { + segnOutput.writeInt(e.getKey()); + segnOutput.writeLong(e.getValue()); + } + segnOutput.writeStringSet(siPerCommit.getUpdatesFiles()); assert si.dir == directory; assert siPerCommit.getDelCount() <= si.getDocCount(); @@ -805,7 +826,7 @@ final String segmentFileName = getSegmentsFileName(); if (segmentFileName != null) { /* - * TODO: if lastGen == -1 we get might get null here it seems wrong to + * TODO: if lastGen == -1 we might get null here it seems wrong to * add null to the files set */ files.add(segmentFileName); Index: lucene/core/src/java/org/apache/lucene/index/SegmentWriteState.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/SegmentWriteState.java (revision 1519637) +++ lucene/core/src/java/org/apache/lucene/index/SegmentWriteState.java (working copy) @@ -71,16 +71,25 @@ * to {@link Directory#createOutput(String,IOContext)}. */ public final IOContext context; + /** True is this instance represents a field update. */ + public final boolean isFieldUpdate; // nocommit if we gen FieldInfos, get rid of this + /** Sole constructor. */ public SegmentWriteState(InfoStream infoStream, Directory directory, SegmentInfo segmentInfo, FieldInfos fieldInfos, BufferedDeletes segDeletes, IOContext context) { + this(infoStream, directory, segmentInfo, fieldInfos, segDeletes, context, "", false); + } + + public SegmentWriteState(InfoStream infoStream, Directory directory, SegmentInfo segmentInfo, FieldInfos fieldInfos, + BufferedDeletes segDeletes, IOContext context, String segmentSuffix, boolean isFieldUpdate) { this.infoStream = infoStream; this.segDeletes = segDeletes; this.directory = directory; this.segmentInfo = segmentInfo; this.fieldInfos = fieldInfos; - segmentSuffix = ""; + this.segmentSuffix = segmentSuffix; this.context = context; + this.isFieldUpdate = isFieldUpdate; } /** Create a shallow copy of {@link SegmentWriteState} with a new segment suffix. */ @@ -93,5 +102,6 @@ this.segmentSuffix = segmentSuffix; segDeletes = state.segDeletes; delCountOnFlush = state.delCountOnFlush; + isFieldUpdate = state.isFieldUpdate; } } Index: lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java (revision 1519637) +++ lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java (working copy) @@ -162,7 +162,8 @@ readerShared[i] = false; newReaders[i] = newReader; } else { - if (newReaders[i].getSegmentInfo().getDelGen() == infos.info(i).getDelGen()) { + if (newReaders[i].getSegmentInfo().getDelGen() == infos.info(i).getDelGen() + && newReaders[i].getSegmentInfo().getDocValuesGen() == infos.info(i).getDocValuesGen()) { // No change; this reader will be shared between // the old and the new one, so we must incRef // it: @@ -172,8 +173,14 @@ readerShared[i] = false; // Steal the ref returned by SegmentReader ctor: assert infos.info(i).info.dir == newReaders[i].getSegmentInfo().info.dir; - assert infos.info(i).hasDeletions(); - newReaders[i] = new SegmentReader(infos.info(i), newReaders[i].core, IOContext.READ); + assert infos.info(i).hasDeletions() || infos.info(i).hasFieldUpdates(); + if (newReaders[i].getSegmentInfo().getDocValuesGen() == infos.info(i).getDocValuesGen()) { + // only deletes have changed + newReaders[i] = new SegmentReader(infos.info(i), newReaders[i].core, IOContext.READ); + } else { + // fields were updated: open a new SegmentCoreReader + newReaders[i] = new SegmentReader(infos.info(i), IOContext.READ); + } } } success = true; Index: lucene/core/src/java/org/apache/lucene/util/RefCount.java =================================================================== --- lucene/core/src/java/org/apache/lucene/util/RefCount.java (revision 0) +++ lucene/core/src/java/org/apache/lucene/util/RefCount.java (working copy) @@ -0,0 +1,83 @@ +package org.apache.lucene.util; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Manages reference counting for a given object. Extensions can override + * {@link #release()} to do custom logic when reference counting hits 0. + */ +public class RefCount { + + private final AtomicInteger refCount = new AtomicInteger(1); + private final T object; + + public RefCount(T object) { + this.object = object; + } + + /** + * Called when reference counting hits 0. By default this method does nothing, + * but extensions can override to e.g. release resources attached to object + * that is managed by this class. + */ + protected void release() throws IOException {} + + /** + * Decrements the reference counting of this object. When reference counting + * hits 0, calls {@link #release()}. + */ + public final void decRef() throws IOException { + final int rc = refCount.decrementAndGet(); + if (rc == 0) { + boolean success = false; + try { + release(); + success = true; + } finally { + if (!success) { + // Put reference back on failure + refCount.incrementAndGet(); + } + } + } else if (rc < 0) { + throw new IllegalStateException("too many decRef calls: refCount is " + rc + " after decrement"); + } + } + + public final T get() { + return object; + } + + /** Returns the current reference count. */ + public final int getRefCount() { + return refCount.get(); + } + + /** + * Increments the reference count. Calls to this method must be matched with + * calls to {@link #decRef()}. + */ + public final void incRef() { + refCount.incrementAndGet(); + } + +} + Index: lucene/core/src/test/org/apache/lucene/index/TestDoc.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestDoc.java (revision 1519637) +++ lucene/core/src/test/org/apache/lucene/index/TestDoc.java (working copy) @@ -239,7 +239,7 @@ } } - return new SegmentInfoPerCommit(info, 0, -1L); + return new SegmentInfoPerCommit(info, 0, -1L, -1L); } Index: lucene/core/src/test/org/apache/lucene/index/TestNumericDocValuesUpdates.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestNumericDocValuesUpdates.java (revision 0) +++ lucene/core/src/test/org/apache/lucene/index/TestNumericDocValuesUpdates.java (working copy) @@ -0,0 +1,1018 @@ +package org.apache.lucene.index; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.CountDownLatch; + +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.DocValuesFormat; +import org.apache.lucene.codecs.asserting.AssertingDocValuesFormat; +import org.apache.lucene.codecs.lucene40.Lucene40RWCodec; +import org.apache.lucene.codecs.lucene41.Lucene41RWCodec; +import org.apache.lucene.codecs.lucene42.Lucene42RWCodec; +import org.apache.lucene.codecs.lucene45.Lucene45Codec; +import org.apache.lucene.codecs.lucene45.Lucene45DocValuesFormat; +import org.apache.lucene.document.BinaryDocValuesField; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.SortedDocValuesField; +import org.apache.lucene.document.SortedSetDocValuesField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.LuceneTestCase.SuppressCodecs; +import org.junit.Test; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@SuppressCodecs({"Lucene40","Lucene41","Lucene42"}) +public class TestNumericDocValuesUpdates extends LuceneTestCase { + + // nocommit add test for index sorting and field updates + + private Document doc(int id) { + Document doc = new Document(); + doc.add(new StringField("id", "doc-" + id, Store.NO)); + // make sure we don't set the doc's value to 0, to not confuse with a document that's missing values + doc.add(new NumericDocValuesField("val", id + 1)); + return doc; + } + + @Test + public void testSimple() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + // make sure random config doesn't flush on us + conf.setMaxBufferedDocs(10); + conf.setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH); + IndexWriter writer = new IndexWriter(dir, conf); + writer.addDocument(doc(0)); // val=1 + writer.addDocument(doc(1)); // val=2 + if (random().nextBoolean()) { // randomly commit before the update is sent + writer.commit(); + } + writer.updateNumericDocValue(new Term("id", "doc-0"), "val", 2L); // doc=0, exp=2 + + final DirectoryReader reader; + if (random().nextBoolean()) { // not NRT + writer.close(); + reader = DirectoryReader.open(dir); + } else { // NRT + reader = DirectoryReader.open(writer, true); + writer.close(); + } + + assertEquals(1, reader.leaves().size()); + AtomicReader r = reader.leaves().get(0).reader(); + NumericDocValues ndv = r.getNumericDocValues("val"); + assertEquals(2, ndv.get(0)); + assertEquals(2, ndv.get(1)); + reader.close(); + + dir.close(); + } + + @Test + public void testUpdateFewSegments() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(2); // generate few segments + conf.setMergePolicy(NoMergePolicy.COMPOUND_FILES); // prevent merges for this test + IndexWriter writer = new IndexWriter(dir, conf); + int numDocs = 10; + long[] expectedValues = new long[numDocs]; + for (int i = 0; i < numDocs; i++) { + writer.addDocument(doc(i)); + expectedValues[i] = i + 1; + } + writer.commit(); + + // update few docs + for (int i = 0; i < numDocs; i++) { + if (random().nextDouble() < 0.4) { + long value = (i + 1) * 2; + writer.updateNumericDocValue(new Term("id", "doc-" + i), "val", value); + expectedValues[i] = value; + } + } + + final DirectoryReader reader; + if (random().nextBoolean()) { // not NRT + writer.close(); + reader = DirectoryReader.open(dir); + } else { // NRT + reader = DirectoryReader.open(writer, true); + writer.close(); + } + + for (AtomicReaderContext context : reader.leaves()) { + AtomicReader r = context.reader(); + NumericDocValues ndv = r.getNumericDocValues("val"); + assertNotNull(ndv); + for (int i = 0; i < r.maxDoc(); i++) { + long expected = expectedValues[i + context.docBase]; + long actual = ndv.get(i); + assertEquals(expected, actual); + } + } + + reader.close(); + dir.close(); + } + + @Test + public void testReopen() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + writer.addDocument(doc(0)); + writer.addDocument(doc(1)); + + final boolean isNRT = random().nextBoolean(); + final DirectoryReader reader1; + if (isNRT) { + reader1 = DirectoryReader.open(writer, true); + } else { + writer.commit(); + reader1 = DirectoryReader.open(dir); + } + + // update doc + writer.updateNumericDocValue(new Term("id", "doc-0"), "val", 10L); // update doc-0's value to 10 + if (!isNRT) { + writer.commit(); + } + + // reopen reader and assert only it sees the update + final DirectoryReader reader2 = DirectoryReader.openIfChanged(reader1); + assertNotNull(reader2); + assertTrue(reader1 != reader2); + + assertEquals(1, reader1.leaves().get(0).reader().getNumericDocValues("val").get(0)); + assertEquals(10, reader2.leaves().get(0).reader().getNumericDocValues("val").get(0)); + + IOUtils.close(writer, reader1, reader2, dir); + } + + @Test + public void testUpdatesAndDeletes() throws Exception { + // create an index with a segment with only deletes, a segment with both + // deletes and updates and a segment with only updates + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(10); // control segment flushing + conf.setMergePolicy(NoMergePolicy.COMPOUND_FILES); // prevent merges for this test + IndexWriter writer = new IndexWriter(dir, conf); + + for (int i = 0; i < 6; i++) { + writer.addDocument(doc(i)); + if (i % 2 == 1) { + writer.commit(); // create 2-docs segments + } + } + + // delete doc-1 and doc-2 + writer.deleteDocuments(new Term("id", "doc-1"), new Term("id", "doc-2")); // 1st and 2nd segments + + // update docs 3 and 5 + writer.updateNumericDocValue(new Term("id", "doc-3"), "val", 17L); + writer.updateNumericDocValue(new Term("id", "doc-5"), "val", 17L); + + final DirectoryReader reader; + if (random().nextBoolean()) { // not NRT + writer.close(); + reader = DirectoryReader.open(dir); + } else { // NRT + reader = DirectoryReader.open(writer, true); + writer.close(); + } + + AtomicReader slow = SlowCompositeReaderWrapper.wrap(reader); + + Bits liveDocs = slow.getLiveDocs(); + boolean[] expectedLiveDocs = new boolean[] { true, false, false, true, true, true }; + for (int i = 0; i < expectedLiveDocs.length; i++) { + assertEquals(expectedLiveDocs[i], liveDocs.get(i)); + } + + long[] expectedValues = new long[] { 1, 2, 3, 17, 5, 17}; + NumericDocValues ndv = slow.getNumericDocValues("val"); + for (int i = 0; i < expectedValues.length; i++) { + assertEquals(expectedValues[i], ndv.get(i)); + } + + reader.close(); + dir.close(); + } + + @Test + public void testUpdatesWithDeletes() throws Exception { + // update and delete different documents in the same commit session + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(10); // control segment flushing + IndexWriter writer = new IndexWriter(dir, conf); + + writer.addDocument(doc(0)); + writer.addDocument(doc(1)); + + if (random().nextBoolean()) { + writer.commit(); + } + + writer.deleteDocuments(new Term("id", "doc-0")); + writer.updateNumericDocValue(new Term("id", "doc-1"), "val", 17L); + + final DirectoryReader reader; + if (random().nextBoolean()) { // not NRT + writer.close(); + reader = DirectoryReader.open(dir); + } else { // NRT + reader = DirectoryReader.open(writer, true); + writer.close(); + } + + AtomicReader r = reader.leaves().get(0).reader(); + assertFalse(r.getLiveDocs().get(0)); + assertEquals(17, r.getNumericDocValues("val").get(1)); + + reader.close(); + dir.close(); + } + + @Test + public void testUpdateAndDeleteSameDocument() throws Exception { + // update and delete same document in same commit session + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(10); // control segment flushing + IndexWriter writer = new IndexWriter(dir, conf); + + writer.addDocument(doc(0)); + writer.addDocument(doc(1)); + + if (random().nextBoolean()) { + writer.commit(); + } + + writer.deleteDocuments(new Term("id", "doc-0")); + writer.updateNumericDocValue(new Term("id", "doc-0"), "val", 17L); + + final DirectoryReader reader; + if (random().nextBoolean()) { // not NRT + writer.close(); + reader = DirectoryReader.open(dir); + } else { // NRT + reader = DirectoryReader.open(writer, true); + writer.close(); + } + + AtomicReader r = reader.leaves().get(0).reader(); + assertFalse(r.getLiveDocs().get(0)); + assertEquals(1, r.getNumericDocValues("val").get(0)); // deletes are currently applied first + + reader.close(); + dir.close(); + } + + @Test + public void testMultipleDocValuesTypes() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(10); // prevent merges + IndexWriter writer = new IndexWriter(dir, conf); + + for (int i = 0; i < 4; i++) { + Document doc = new Document(); + doc.add(new StringField("dvUpdateKey", "dv", Store.NO)); + doc.add(new NumericDocValuesField("ndv", i)); + doc.add(new BinaryDocValuesField("bdv", new BytesRef(Integer.toString(i)))); + doc.add(new SortedDocValuesField("sdv", new BytesRef(Integer.toString(i)))); + doc.add(new SortedSetDocValuesField("ssdv", new BytesRef(Integer.toString(i)))); + doc.add(new SortedSetDocValuesField("ssdv", new BytesRef(Integer.toString(i * 2)))); + writer.addDocument(doc); + } + writer.commit(); + + // update all docs' ndv field + writer.updateNumericDocValue(new Term("dvUpdateKey", "dv"), "ndv", 17L); + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + AtomicReader r = reader.leaves().get(0).reader(); + NumericDocValues ndv = r.getNumericDocValues("ndv"); + BinaryDocValues bdv = r.getBinaryDocValues("bdv"); + SortedDocValues sdv = r.getSortedDocValues("sdv"); + SortedSetDocValues ssdv = r.getSortedSetDocValues("ssdv"); + BytesRef scratch = new BytesRef(); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(17, ndv.get(0)); + bdv.get(i, scratch); + assertEquals(new BytesRef(Integer.toString(i)), scratch); + sdv.get(i, scratch); + assertEquals(new BytesRef(Integer.toString(i)), scratch); + ssdv.setDocument(i); + long ord = ssdv.nextOrd(); + ssdv.lookupOrd(ord, scratch); + assertEquals(i, Integer.parseInt(scratch.utf8ToString())); + if (i != 0) { + ord = ssdv.nextOrd(); + ssdv.lookupOrd(ord, scratch); + assertEquals(i * 2, Integer.parseInt(scratch.utf8ToString())); + } + assertEquals(SortedSetDocValues.NO_MORE_ORDS, ssdv.nextOrd()); + } + + reader.close(); + dir.close(); + } + + @Test + public void testMultipleNumericDocValues() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(10); // prevent merges + IndexWriter writer = new IndexWriter(dir, conf); + + for (int i = 0; i < 2; i++) { + Document doc = new Document(); + doc.add(new StringField("dvUpdateKey", "dv", Store.NO)); + doc.add(new NumericDocValuesField("ndv1", i)); + doc.add(new NumericDocValuesField("ndv2", i)); + writer.addDocument(doc); + } + writer.commit(); + + // update all docs' ndv1 field + writer.updateNumericDocValue(new Term("dvUpdateKey", "dv"), "ndv1", 17L); + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + AtomicReader r = reader.leaves().get(0).reader(); + NumericDocValues ndv1 = r.getNumericDocValues("ndv1"); + NumericDocValues ndv2 = r.getNumericDocValues("ndv2"); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(17, ndv1.get(i)); + assertEquals(i, ndv2.get(i)); + } + + reader.close(); + dir.close(); + } + + @Test + public void testDocumentWithNoValue() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + for (int i = 0; i < 2; i++) { + Document doc = new Document(); + doc.add(new StringField("dvUpdateKey", "dv", Store.NO)); + if (i == 0) { // index only one document with value + doc.add(new NumericDocValuesField("ndv", 5)); + } + writer.addDocument(doc); + } + writer.commit(); + + // update all docs' ndv field + writer.updateNumericDocValue(new Term("dvUpdateKey", "dv"), "ndv", 17L); + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + AtomicReader r = reader.leaves().get(0).reader(); + NumericDocValues ndv = r.getNumericDocValues("ndv"); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(17, ndv.get(i)); + } + + reader.close(); + dir.close(); + } + + @Test + public void testUnsetValue() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + for (int i = 0; i < 2; i++) { + Document doc = new Document(); + doc.add(new StringField("id", "doc" + i, Store.NO)); + doc.add(new NumericDocValuesField("ndv", 5)); + writer.addDocument(doc); + } + writer.commit(); + + // unset the value of 'doc0' + writer.updateNumericDocValue(new Term("id", "doc0"), "ndv", null); + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + AtomicReader r = reader.leaves().get(0).reader(); + NumericDocValues ndv = r.getNumericDocValues("ndv"); + for (int i = 0; i < r.maxDoc(); i++) { + if (i == 0) { + assertEquals(0, ndv.get(i)); + } else { + assertEquals(5, ndv.get(i)); + } + } + + Bits docsWithField = r.getDocsWithField("ndv"); + assertFalse(docsWithField.get(0)); + assertTrue(docsWithField.get(1)); + + reader.close(); + dir.close(); + } + + @Test + public void testUpdateNonDocValueField() throws Exception { + // we don't support adding new fields or updating existing non-numeric-dv + // fields through numeric updates + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + Document doc = new Document(); + doc.add(new StringField("key", "doc", Store.NO)); + doc.add(new StringField("foo", "bar", Store.NO)); + writer.addDocument(doc); // flushed document + writer.commit(); + writer.addDocument(doc); // in-memory document + + try { + writer.updateNumericDocValue(new Term("key", "doc"), "ndv", 17L); + fail("should not have allowed creating new fields through update"); + } catch (IllegalArgumentException e) { + // ok + } + + try { + writer.updateNumericDocValue(new Term("key", "doc"), "foo", 17L); + fail("should not have allowed updating an existing field to numeric-dv"); + } catch (IllegalArgumentException e) { + // ok + } + + writer.close(); + dir.close(); + } + + @Test + public void testDifferentDVFormatPerField() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setCodec(new Lucene45Codec() { + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return new Lucene45DocValuesFormat(); + } + }); + IndexWriter writer = new IndexWriter(dir, conf); + + Document doc = new Document(); + doc.add(new StringField("key", "doc", Store.NO)); + doc.add(new NumericDocValuesField("ndv", 5)); + doc.add(new SortedDocValuesField("sorted", new BytesRef("value"))); + writer.addDocument(doc); // flushed document + writer.commit(); + writer.addDocument(doc); // in-memory document + + writer.updateNumericDocValue(new Term("key", "doc"), "ndv", 17L); + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + + AtomicReader r = SlowCompositeReaderWrapper.wrap(reader); + NumericDocValues ndv = r.getNumericDocValues("ndv"); + SortedDocValues sdv = r.getSortedDocValues("sorted"); + BytesRef scratch = new BytesRef(); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(17, ndv.get(i)); + sdv.get(i, scratch); + assertEquals(new BytesRef("value"), scratch); + } + + reader.close(); + dir.close(); + } + + @Test + public void testUpdateSameDocMultipleTimes() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + Document doc = new Document(); + doc.add(new StringField("key", "doc", Store.NO)); + doc.add(new NumericDocValuesField("ndv", 5)); + writer.addDocument(doc); // flushed document + writer.commit(); + writer.addDocument(doc); // in-memory document + + writer.updateNumericDocValue(new Term("key", "doc"), "ndv", 17L); // update existing field + writer.updateNumericDocValue(new Term("key", "doc"), "ndv", 3L); // update existing field 2nd time in this commit + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + final AtomicReader r = SlowCompositeReaderWrapper.wrap(reader); + NumericDocValues ndv = r.getNumericDocValues("ndv"); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(3, ndv.get(i)); + } + reader.close(); + dir.close(); + } + + @Test + public void testSegmentMerges() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setReaderPooling(random().nextBoolean()); + IndexWriter writer = new IndexWriter(dir, conf.clone()); + + int numRounds = atLeast(3); + for (int rnd = 0; rnd < numRounds; rnd++) { + Document doc = new Document(); + doc.add(new StringField("key", "doc", Store.NO)); + doc.add(new NumericDocValuesField("ndv", -1)); + for (int i = 0; i < 2; i++) { // create two segments + writer.addDocument(doc); + } + + long value = rnd + 1; + writer.updateNumericDocValue(new Term("key", "doc"), "ndv", value); // update existing field + + // randomly commit or reopen-IW or none, before forceMerge + if (random().nextBoolean()) { // commit before forceMerge + writer.commit(); + } else if (random().nextBoolean()) { + writer.close(); + writer = new IndexWriter(dir, conf.clone()); + } + + writer.forceMerge(1); + final DirectoryReader reader; + if (random().nextBoolean()) { + writer.commit(); + reader = DirectoryReader.open(dir); + } else { + reader = DirectoryReader.open(writer, true); + } + + assertEquals(1, reader.leaves().size()); + final AtomicReader r = reader.leaves().get(0).reader(); + NumericDocValues ndv = r.getNumericDocValues("ndv"); + assertNotNull(ndv); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(value, ndv.get(i)); + } + reader.close(); + } + + writer.close(); + dir.close(); + } + + @Test + public void testUpdateDocumentByMultipleTerms() throws Exception { + // make sure the order of updates is respected, even when multiple terms affect same document + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + Document doc = new Document(); + doc.add(new StringField("k1", "v1", Store.NO)); + doc.add(new StringField("k2", "v2", Store.NO)); + doc.add(new NumericDocValuesField("ndv", 5)); + writer.addDocument(doc); // flushed document + writer.commit(); + writer.addDocument(doc); // in-memory document + + writer.updateNumericDocValue(new Term("k1", "v1"), "ndv", 17L); + writer.updateNumericDocValue(new Term("k2", "v2"), "ndv", 3L); + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + final AtomicReader r = SlowCompositeReaderWrapper.wrap(reader); + NumericDocValues ndv = r.getNumericDocValues("ndv"); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(3, ndv.get(i)); + } + reader.close(); + dir.close(); + } + + @Test + public void testManyReopensAndFields() throws Exception { + Directory dir = newDirectory(); + final Random random = random(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)); + LogMergePolicy lmp = newLogMergePolicy(); + lmp.setMergeFactor(3); // merge often + conf.setMergePolicy(lmp); + IndexWriter writer = new IndexWriter(dir, conf); + + final boolean isNRT = random.nextBoolean(); + DirectoryReader reader; + if (isNRT) { + reader = DirectoryReader.open(writer, true); + } else { + writer.commit(); + reader = DirectoryReader.open(dir); + } + + final int numFields = random.nextInt(4) + 3; // 3-7 + final long[] fieldValues = new long[numFields]; + for (int i = 0; i < fieldValues.length; i++) { + fieldValues[i] = 1; + } + + int numRounds = atLeast(15); + int docID = 0; + for (int i = 0; i < numRounds; i++) { + int numDocs = atLeast(2); + // System.out.println("round=" + i + ", numDocs=" + numDocs); + for (int j = 0; j < numDocs; j++) { + Document doc = new Document(); + doc.add(new StringField("id", "doc-" + docID, Store.NO)); + doc.add(new StringField("key", "all", Store.NO)); // update key + // add all fields with their current (updated value) + for (int f = 0; f < fieldValues.length; f++) { + doc.add(new NumericDocValuesField("f" + f, fieldValues[f])); + } + writer.addDocument(doc); + ++docID; + } + + int fieldIdx = random.nextInt(fieldValues.length); + String updateField = "f" + fieldIdx; + writer.updateNumericDocValue(new Term("key", "all"), updateField, ++fieldValues[fieldIdx]); + // System.out.println("+++ updated field '" + updateField + "' to value " + fieldValues[fieldIdx]); + + if (random.nextDouble() < 0.2) { + int deleteDoc = random.nextInt(docID); // might also delete an already deleted document, ok! + writer.deleteDocuments(new Term("id", "doc-" + deleteDoc)); + // System.out.println("--- deleted document: doc-" + deleteDoc); + } + + // verify reader + if (!isNRT) { + writer.commit(); + } + + DirectoryReader newReader = DirectoryReader.openIfChanged(reader); + assertNotNull(newReader); + reader.close(); + reader = newReader; + assertTrue(reader.numDocs() > 0); // we delete at most one document per round + for (AtomicReaderContext context : reader.leaves()) { + AtomicReader r = context.reader(); + // System.out.println(((SegmentReader) r).getSegmentName()); + Bits liveDocs = r.getLiveDocs(); + for (int field = 0; field < fieldValues.length; field++) { + String f = "f" + field; + NumericDocValues ndv = r.getNumericDocValues(f); + assertNotNull(ndv); + int maxDoc = r.maxDoc(); + for (int doc = 0; doc < maxDoc; doc++) { + if (liveDocs == null || liveDocs.get(doc)) { + // System.out.println("doc=" + (doc + context.docBase) + " f='" + f + "' vslue=" + ndv.get(doc)); + assertEquals("invalid value for doc=" + (doc + context.docBase) + ", field=" + f, fieldValues[field], ndv.get(doc)); + } + } + } + } + // System.out.println(); + } + + IOUtils.close(writer, reader, dir); + } + + @Test + public void testUpdateSegmentWithNoDocValues() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + // first segment with NDV + Document doc = new Document(); + doc.add(new StringField("id", "doc0", Store.NO)); + doc.add(new NumericDocValuesField("ndv", 5)); + writer.addDocument(doc); + writer.commit(); + + // second segment with no NDV + doc = new Document(); + doc.add(new StringField("id", "doc1", Store.NO)); + writer.addDocument(doc); + writer.commit(); + + // update documentin the second segment + writer.updateNumericDocValue(new Term("id", "doc1"), "ndv", 5L); + try { + writer.close(); + fail("should not have succeeded updating a segment with no numeric DocValues field"); + } catch (UnsupportedOperationException e) { + // expected + writer.rollback(); + } + + dir.close(); + } + + @Test + public void testUpdateSegmentWithPostingButNoDocValues() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + // first segment with NDV + Document doc = new Document(); + doc.add(new StringField("id", "doc0", Store.NO)); + doc.add(new StringField("ndv", "mock-value", Store.NO)); + doc.add(new NumericDocValuesField("ndv", 5)); + writer.addDocument(doc); + writer.commit(); + + // second segment with no NDV + doc = new Document(); + doc.add(new StringField("id", "doc1", Store.NO)); + doc.add(new StringField("ndv", "mock-value", Store.NO)); + writer.addDocument(doc); + writer.commit(); + + // update documentin the second segment + writer.updateNumericDocValue(new Term("id", "doc1"), "ndv", 5L); + try { + writer.close(); + fail("should not have succeeded updating a segment with no numeric DocValues field"); + } catch (UnsupportedOperationException e) { + // expected + writer.rollback(); + } + + dir.close(); + } + + @Test + public void testUpdateNumericDVFieldWithSameNameAsPostingField() throws Exception { + // this used to fail because FieldInfos.Builder neglected to update + // globalFieldMaps.docValueTypes map + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + Document doc = new Document(); + doc.add(new StringField("f", "mock-value", Store.NO)); + doc.add(new NumericDocValuesField("f", 5)); + writer.addDocument(doc); + writer.commit(); + writer.updateNumericDocValue(new Term("f", "mock-value"), "f", 17L); + writer.close(); + + DirectoryReader r = DirectoryReader.open(dir); + NumericDocValues ndv = r.leaves().get(0).reader().getNumericDocValues("f"); + assertEquals(17, ndv.get(0)); + r.close(); + + dir.close(); + } + + @Test + public void testUpdateOldSegments() throws Exception { + Codec[] oldCodecs = new Codec[] { new Lucene40RWCodec(), new Lucene41RWCodec(), new Lucene42RWCodec() }; + Directory dir = newDirectory(); + + // create a segment with an old Codec + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setCodec(oldCodecs[random().nextInt(oldCodecs.length)]); + IndexWriter writer = new IndexWriter(dir, conf); + Document doc = new Document(); + doc.add(new StringField("id", "doc", Store.NO)); + doc.add(new NumericDocValuesField("f", 5)); + writer.addDocument(doc); + writer.close(); + + conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + writer = new IndexWriter(dir, conf); + writer.updateNumericDocValue(new Term("id", "doc"), "f", 4L); + OLD_FORMAT_IMPERSONATION_IS_ACTIVE = false; + try { + writer.close(); + fail("should not have succeeded to update a segment written with an old Codec"); + } catch (UnsupportedOperationException e) { + writer.rollback(); + } finally { + OLD_FORMAT_IMPERSONATION_IS_ACTIVE = true; + } + + dir.close(); + } + + @Test + public void testStressMultiThreading() throws Exception { + final Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(2); + final IndexWriter writer = new IndexWriter(dir, conf); + + // create index + final int numThreads = atLeast(3); + final int numDocs = atLeast(2000); + for (int i = 0; i < numDocs; i++) { + Document doc = new Document(); + double group = random().nextDouble(); + String g; + if (group < 0.1) g = "g0"; + else if (group < 0.5) g = "g1"; + else if (group < 0.8) g = "g2"; + else g = "g3"; + doc.add(new StringField("updKey", g, Store.NO)); + for (int j = 0; j < numThreads; j++) { + long value = random().nextInt(); + doc.add(new NumericDocValuesField("f" + j, value)); + doc.add(new NumericDocValuesField("cf" + j, value * 2)); // control, always updated to f * 2 + } + writer.addDocument(doc); + } + + final CountDownLatch done = new CountDownLatch(numThreads); + + // same thread updates a field as well as reopens + Thread[] threads = new Thread[numThreads]; + for (int i = 0; i < threads.length; i++) { + final String f = "f" + i; + final String cf = "cf" + i; + threads[i] = new Thread("UpdateThread-" + i) { + @Override + public void run() { + try { + int numUpdates = atLeast(40); + Random random = random(); + while (numUpdates-- > 0) { + double group = random.nextDouble(); + Term t; + if (group < 0.1) t = new Term("updKey", "g0"); + else if (group < 0.5) t = new Term("updKey", "g1"); + else if (group < 0.8) t = new Term("updKey", "g2"); + else t = new Term("updKey", "g3"); + long updValue = random.nextInt(); + writer.updateNumericDocValue(t, f, updValue); + writer.updateNumericDocValue(t, cf, updValue * 2); + + if (random.nextDouble() < 0.2) { + // delete a random document + int doc = random.nextInt(numDocs); + writer.deleteDocuments(new Term("id", "doc" + doc)); + } + + if (random.nextDouble() < 0.1) { + writer.commit(); // rarely commit + } + + if (random.nextDouble() < 0.3) { // obtain NRT reader (apply updates) + DirectoryReader.open(writer, true).close(); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + done.countDown(); + } + } + }; + } + + for (Thread t : threads) t.start(); + done.await(); + writer.close(); + + DirectoryReader reader = DirectoryReader.open(dir); + for (AtomicReaderContext context : reader.leaves()) { + AtomicReader r = context.reader(); + for (int i = 0; i < numThreads; i++) { + NumericDocValues ndv = r.getNumericDocValues("f" + i); + NumericDocValues control = r.getNumericDocValues("cf" + i); + Bits liveDocs = r.getLiveDocs(); + for (int j = 0; j < r.maxDoc(); j++) { + if (liveDocs == null || liveDocs.get(j)) { + assertEquals(control.get(j), ndv.get(j) * 2); + } + } + } + } + reader.close(); + + dir.close(); + } + + @Test + public void testUpdateDifferentDocsInDifferentGens() throws Exception { + // update same document multiple times across generations + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(4); + IndexWriter writer = new IndexWriter(dir, conf); + final int numDocs = atLeast(10); + for (int i = 0; i < numDocs; i++) { + Document doc = new Document(); + doc.add(new StringField("id", "doc" + i, Store.NO)); + long value = random().nextInt(); + doc.add(new NumericDocValuesField("f", value)); + doc.add(new NumericDocValuesField("cf", value * 2)); + writer.addDocument(doc); + } + + int numGens = atLeast(5); + for (int i = 0; i < numGens; i++) { + int doc = random().nextInt(numDocs); + Term t = new Term("id", "doc" + doc); + long value = random().nextLong(); + writer.updateNumericDocValue(t, "f", value); + writer.updateNumericDocValue(t, "cf", value * 2); + DirectoryReader reader = DirectoryReader.open(writer, true); + for (AtomicReaderContext context : reader.leaves()) { + AtomicReader r = context.reader(); + NumericDocValues fndv = r.getNumericDocValues("f"); + NumericDocValues cfndv = r.getNumericDocValues("cf"); + for (int j = 0; j < r.maxDoc(); j++) { + assertEquals(cfndv.get(j), fndv.get(j) * 2); + } + } + reader.close(); + } + writer.close(); + dir.close(); + } + + @Test + public void testChangeCodec() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setCodec(new Lucene45Codec() { + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return new Lucene45DocValuesFormat(); + } + }); + IndexWriter writer = new IndexWriter(dir, conf.clone()); + Document doc = new Document(); + doc.add(new StringField("id", "d0", Store.NO)); + doc.add(new NumericDocValuesField("f1", 5L)); + doc.add(new NumericDocValuesField("f2", 13L)); + writer.addDocument(doc); + writer.close(); + + // change format + conf.setCodec(new Lucene45Codec() { + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return new AssertingDocValuesFormat(); + } + }); + writer = new IndexWriter(dir, conf.clone()); + doc = new Document(); + doc.add(new StringField("id", "d1", Store.NO)); + doc.add(new NumericDocValuesField("f1", 17L)); + doc.add(new NumericDocValuesField("f2", 2L)); + writer.addDocument(doc); + writer.updateNumericDocValue(new Term("id", "d0"), "f1", 12L); + writer.close(); + + DirectoryReader reader = DirectoryReader.open(dir); + AtomicReader r = SlowCompositeReaderWrapper.wrap(reader); + NumericDocValues f1 = r.getNumericDocValues("f1"); + NumericDocValues f2 = r.getNumericDocValues("f2"); + assertEquals(12L, f1.get(0)); + assertEquals(13L, f2.get(0)); + assertEquals(17L, f1.get(1)); + assertEquals(2L, f2.get(1)); + reader.close(); + dir.close(); + } + +} Index: lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java (revision 1519637) +++ lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java (working copy) @@ -90,7 +90,7 @@ SegmentReader mergedReader = new SegmentReader(new SegmentInfoPerCommit( new SegmentInfo(mergedDir, Constants.LUCENE_MAIN_VERSION, mergedSegment, docsMerged, false, codec, null, null), - 0, -1L), + 0, -1L, -1L), newIOContext(random())); assertTrue(mergedReader != null); assertTrue(mergedReader.numDocs() == 2); Index: lucene/misc/src/java/org/apache/lucene/index/IndexSplitter.java =================================================================== --- lucene/misc/src/java/org/apache/lucene/index/IndexSplitter.java (revision 1519637) +++ lucene/misc/src/java/org/apache/lucene/index/IndexSplitter.java (working copy) @@ -141,7 +141,7 @@ SegmentInfo newInfo = new SegmentInfo(destFSDir, info.getVersion(), info.name, info.getDocCount(), info.getUseCompoundFile(), info.getCodec(), info.getDiagnostics(), info.attributes()); - destInfos.add(new SegmentInfoPerCommit(newInfo, infoPerCommit.getDelCount(), infoPerCommit.getDelGen())); + destInfos.add(new SegmentInfoPerCommit(newInfo, infoPerCommit.getDelCount(), infoPerCommit.getDelGen(), infoPerCommit.getDocValuesGen())); // now copy files over Collection files = infoPerCommit.files(); for (final String srcName : files) {