Index: lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesReader.java =================================================================== --- lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesReader.java (revision 1523412) +++ lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesReader.java (working copy) @@ -17,6 +17,16 @@ * limitations under the License. */ +import static org.apache.lucene.codecs.simpletext.SimpleTextDocValuesWriter.END; +import static org.apache.lucene.codecs.simpletext.SimpleTextDocValuesWriter.FIELD; +import static org.apache.lucene.codecs.simpletext.SimpleTextDocValuesWriter.LENGTH; +import static org.apache.lucene.codecs.simpletext.SimpleTextDocValuesWriter.MAXLENGTH; +import static org.apache.lucene.codecs.simpletext.SimpleTextDocValuesWriter.MINVALUE; +import static org.apache.lucene.codecs.simpletext.SimpleTextDocValuesWriter.NUMVALUES; +import static org.apache.lucene.codecs.simpletext.SimpleTextDocValuesWriter.ORDPATTERN; +import static org.apache.lucene.codecs.simpletext.SimpleTextDocValuesWriter.PATTERN; +import static org.apache.lucene.codecs.simpletext.SimpleTextDocValuesWriter.TYPE; + import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; @@ -31,11 +41,11 @@ import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfo.DocValuesType; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SegmentReadState; import org.apache.lucene.index.SortedDocValues; -import org.apache.lucene.index.FieldInfo.DocValuesType; import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.store.IndexInput; import org.apache.lucene.util.Bits; @@ -42,16 +52,6 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.StringHelper; -import static org.apache.lucene.codecs.simpletext.SimpleTextDocValuesWriter.END; -import static org.apache.lucene.codecs.simpletext.SimpleTextDocValuesWriter.FIELD; -import static org.apache.lucene.codecs.simpletext.SimpleTextDocValuesWriter.LENGTH; -import static org.apache.lucene.codecs.simpletext.SimpleTextDocValuesWriter.MAXLENGTH; -import static org.apache.lucene.codecs.simpletext.SimpleTextDocValuesWriter.MINVALUE; -import static org.apache.lucene.codecs.simpletext.SimpleTextDocValuesWriter.NUMVALUES; -import static org.apache.lucene.codecs.simpletext.SimpleTextDocValuesWriter.ORDPATTERN; -import static org.apache.lucene.codecs.simpletext.SimpleTextDocValuesWriter.PATTERN; -import static org.apache.lucene.codecs.simpletext.SimpleTextDocValuesWriter.TYPE; - class SimpleTextDocValuesReader extends DocValuesProducer { static class OneField { @@ -62,8 +62,7 @@ boolean fixedLength; long minValue; long numValues; - - }; + } final int maxDoc; final IndexInput data; @@ -71,7 +70,7 @@ final Map fields = new HashMap(); public SimpleTextDocValuesReader(SegmentReadState state, String ext) throws IOException { - //System.out.println("dir=" + state.directory + " seg=" + state.segmentInfo.name + " ext=" + ext); + // System.out.println("dir=" + state.directory + " seg=" + state.segmentInfo.name + " file=" + IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, ext)); data = state.directory.openInput(IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, ext), state.context); maxDoc = state.segmentInfo.getDocCount(); while(true) { @@ -83,8 +82,6 @@ assert startsWith(FIELD) : scratch.utf8ToString(); String fieldName = stripPrefix(FIELD); //System.out.println(" field=" + fieldName); - FieldInfo fieldInfo = state.fieldInfos.fieldInfo(fieldName); - assert fieldInfo != null; OneField field = new OneField(); fields.put(fieldName, field); Index: lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesWriter.java =================================================================== --- lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesWriter.java (revision 1523412) +++ lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesWriter.java (working copy) @@ -55,7 +55,7 @@ private final Set fieldsSeen = new HashSet(); // for asserting public SimpleTextDocValuesWriter(SegmentWriteState state, String ext) throws IOException { - //System.out.println("WRITE: " + IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, ext) + " " + state.segmentInfo.getDocCount() + " docs"); + // System.out.println("WRITE: " + IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, ext) + " " + state.segmentInfo.getDocCount() + " docs"); data = state.directory.createOutput(IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, ext), state.context); numDocs = state.segmentInfo.getDocCount(); } Index: lucene/core/src/java/org/apache/lucene/codecs/perfield/PerFieldDocValuesFormat.java =================================================================== --- lucene/core/src/java/org/apache/lucene/codecs/perfield/PerFieldDocValuesFormat.java (revision 1523412) +++ lucene/core/src/java/org/apache/lucene/codecs/perfield/PerFieldDocValuesFormat.java (working copy) @@ -22,13 +22,13 @@ import java.util.HashMap; import java.util.IdentityHashMap; import java.util.Map; -import java.util.ServiceLoader; // javadocs +import java.util.ServiceLoader; import java.util.TreeMap; -import org.apache.lucene.codecs.PostingsFormat; import org.apache.lucene.codecs.DocValuesConsumer; +import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.DocValuesProducer; -import org.apache.lucene.codecs.DocValuesFormat; +import org.apache.lucene.codecs.PostingsFormat; import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.NumericDocValues; @@ -76,11 +76,10 @@ } @Override - public final DocValuesConsumer fieldsConsumer(SegmentWriteState state) - throws IOException { + public final DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOException { return new FieldsWriter(state); } - + static class ConsumerAndSuffix implements Closeable { DocValuesConsumer consumer; int suffix; @@ -97,7 +96,7 @@ private final Map suffixes = new HashMap(); private final SegmentWriteState segmentWriteState; - + public FieldsWriter(SegmentWriteState state) { segmentWriteState = state; } @@ -123,7 +122,14 @@ } private DocValuesConsumer getInstance(FieldInfo field) throws IOException { - final DocValuesFormat format = getDocValuesFormatForField(field.name); + final DocValuesFormat format; + if (segmentWriteState.isFieldUpdate) { + final String formatName = field.getAttribute(PER_FIELD_FORMAT_KEY); + assert formatName != null : "invalid null FORMAT_KEY for field=\"" + field.name + "\" (field updates)"; + format = DocValuesFormat.forName(formatName); + } else { + format = getDocValuesFormatForField(field.name); + } if (format == null) { throw new IllegalStateException("invalid null DocValuesFormat for field=\"" + field.name + "\""); } @@ -130,7 +136,7 @@ final String formatName = format.getName(); String previousValue = field.putAttribute(PER_FIELD_FORMAT_KEY, formatName); - assert previousValue == null: "formatName=" + formatName + " prevValue=" + previousValue; + assert segmentWriteState.isFieldUpdate || previousValue == null: "formatName=" + formatName + " prevValue=" + previousValue; Integer suffix; @@ -137,18 +143,23 @@ ConsumerAndSuffix consumer = formats.get(format); if (consumer == null) { // First time we are seeing this format; create a new instance - - // bump the suffix - suffix = suffixes.get(formatName); - if (suffix == null) { - suffix = 0; + + if (segmentWriteState.isFieldUpdate) { + final String suffixAtt = field.getAttribute(PER_FIELD_SUFFIX_KEY); + assert suffixAtt != null : "invalid numm SUFFIX_KEY for field=\"" + field.name + "\" (field updates)"; + suffix = Integer.valueOf(suffixAtt); } else { - suffix = suffix + 1; + // bump the suffix + suffix = suffixes.get(formatName); + if (suffix == null) { + suffix = 0; + } else { + suffix = suffix + 1; + } } suffixes.put(formatName, suffix); - final String segmentSuffix = getFullSegmentSuffix(field.name, - segmentWriteState.segmentSuffix, + final String segmentSuffix = getFullSegmentSuffix(segmentWriteState.segmentSuffix, getSuffix(formatName, Integer.toString(suffix))); consumer = new ConsumerAndSuffix(); consumer.consumer = format.fieldsConsumer(new SegmentWriteState(segmentWriteState, segmentSuffix)); @@ -161,10 +172,10 @@ } previousValue = field.putAttribute(PER_FIELD_SUFFIX_KEY, Integer.toString(suffix)); - assert previousValue == null; + assert segmentWriteState.isFieldUpdate || previousValue == null : "suffix=" + Integer.toString(suffix) + " prevValue=" + previousValue; // TODO: we should only provide the "slice" of FIS - // that this PF actually sees ... + // that this DVF actually sees ... return consumer.consumer; } @@ -179,14 +190,11 @@ return formatName + "_" + suffix; } - static String getFullSegmentSuffix(String fieldName, String outerSegmentSuffix, String segmentSuffix) { + static String getFullSegmentSuffix(String outerSegmentSuffix, String segmentSuffix) { if (outerSegmentSuffix.length() == 0) { return segmentSuffix; } else { - // TODO: support embedding; I think it should work but - // we need a test confirm to confirm - // return outerSegmentSuffix + "_" + segmentSuffix; - throw new IllegalStateException("cannot embed PerFieldPostingsFormat inside itself (field \"" + fieldName + "\" returned PerFieldPostingsFormat)"); + return outerSegmentSuffix + "_" + segmentSuffix; } } @@ -210,7 +218,7 @@ final String suffix = fi.getAttribute(PER_FIELD_SUFFIX_KEY); assert suffix != null; DocValuesFormat format = DocValuesFormat.forName(formatName); - String segmentSuffix = getSuffix(formatName, suffix); + String segmentSuffix = getFullSegmentSuffix(readState.segmentSuffix, getSuffix(formatName, suffix)); if (!formats.containsKey(segmentSuffix)) { formats.put(segmentSuffix, format.fieldsProducer(new SegmentReadState(readState, segmentSuffix))); } Index: lucene/core/src/java/org/apache/lucene/index/BufferedDeletes.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/BufferedDeletes.java (revision 1523412) +++ lucene/core/src/java/org/apache/lucene/index/BufferedDeletes.java (working copy) @@ -19,25 +19,26 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.search.Query; import org.apache.lucene.util.RamUsageEstimator; -/* Holds buffered deletes, by docID, term or query for a +/* Holds buffered deletes and updates, by docID, term or query for a * single segment. This is used to hold buffered pending - * deletes against the to-be-flushed segment. Once the - * deletes are pushed (on flush in DocumentsWriter), these - * deletes are converted to a FrozenDeletes instance. */ + * deletes and updates against the to-be-flushed segment. Once the + * deletes and updates are pushed (on flush in DocumentsWriter), they + * are converted to a FrozenDeletes instance. */ // NOTE: instances of this class are accessed either via a private // instance on DocumentWriterPerThread, or via sync'd code by // DocumentsWriterDeleteQueue -class BufferedDeletes { +class BufferedDeletes { // TODO (DVU_RENAME) BufferedUpdates? /* Rough logic: HashMap has an array[Entry] w/ varying load factor (say 2 * POINTER). Entry is object w/ Term @@ -63,11 +64,45 @@ undercount (say 24 bytes). Integer is OBJ_HEADER + INT. */ final static int BYTES_PER_DEL_QUERY = 5*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 2*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + 2*RamUsageEstimator.NUM_BYTES_INT + 24; + /* Rough logic: NumericUpdate calculates its actual size, + * including the update Term and DV field (String). The + * per-term map holds a reference to the update Term, and + * therefore we only account for the object reference and + * map space itself. This is incremented when we first see + * an update Term. + * LinkedHashMap has an array[Entry] w/ varying load factor + * (say 2*POINTER). Entry is an object w/ Term key, Map val, + * int hash, Entry next, Entry before, Entry after (OBJ_HEADER + 5*POINTER + INT). + * Term (key) is counted only as POINTER. + * Map (val) is counted as OBJ_HEADER, array[Entry] ref + header, 4*INT, 1*FLOAT, + * Set (entrySet) (2*OBJ_HEADER + ARRAY_HEADER + 2*POINTER + 4*INT + FLOAT) + */ + final static int BYTES_PER_NUMERIC_UPDATE_TERM_ENTRY = + 9*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 3*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + 5*RamUsageEstimator.NUM_BYTES_INT + RamUsageEstimator.NUM_BYTES_FLOAT; + + /* Rough logic: Incremented when we see another field for an already updated + * Term. + * HashMap has an array[Entry] w/ varying load + * factor (say 2*POINTER). Entry is an object w/ String key, + * NumericUpdate val, int hash, Entry next (OBJ_HEADER + 3*POINTER + INT). + * NumericUpdate returns its own size, and therefore isn't accounted for here. + */ + final static int BYTES_PER_NUMERIC_UPDATE_ENTRY = 5*RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + RamUsageEstimator.NUM_BYTES_INT; + final AtomicInteger numTermDeletes = new AtomicInteger(); + final AtomicInteger numNumericUpdates = new AtomicInteger(); final Map terms = new HashMap(); final Map queries = new HashMap(); final List docIDs = new ArrayList(); + // Map> + // LinkedHashMap because we need to preserve the order of the updates. That + // is, if two terms update the same document and same DV field, whoever came + // in last should win. LHM guarantees we iterate on the map in insertion + // order. + final Map> numericUpdates = new LinkedHashMap>(); + public static final Integer MAX_INT = Integer.valueOf(Integer.MAX_VALUE); final AtomicLong bytesUsed; @@ -75,21 +110,17 @@ private final static boolean VERBOSE_DELETES = false; long gen; + public BufferedDeletes() { - this(new AtomicLong()); + this.bytesUsed = new AtomicLong(); } - BufferedDeletes(AtomicLong bytesUsed) { - assert bytesUsed != null; - this.bytesUsed = bytesUsed; - } - @Override public String toString() { if (VERBOSE_DELETES) { return "gen=" + gen + " numTerms=" + numTermDeletes + ", terms=" + terms - + ", queries=" + queries + ", docIDs=" + docIDs + ", bytesUsed=" - + bytesUsed; + + ", queries=" + queries + ", docIDs=" + docIDs + ", numericUpdates=" + numericUpdates + + ", bytesUsed=" + bytesUsed; } else { String s = "gen=" + gen; if (numTermDeletes.get() != 0) { @@ -101,6 +132,9 @@ if (docIDs.size() != 0) { s += " " + docIDs.size() + " deleted docIDs"; } + if (numNumericUpdates.get() != 0) { + s += " " + numNumericUpdates.get() + " numeric updates (unique count=" + numericUpdates.size() + ")"; + } if (bytesUsed.get() != 0) { s += " bytesUsed=" + bytesUsed.get(); } @@ -145,20 +179,41 @@ } } + public void addNumericUpdate(NumericUpdate update, int docIDUpto) { + Map termUpdates = numericUpdates.get(update.term); + if (termUpdates == null) { + termUpdates = new HashMap(); + numericUpdates.put(update.term, termUpdates); + bytesUsed.addAndGet(BYTES_PER_NUMERIC_UPDATE_TERM_ENTRY); + } + final NumericUpdate current = termUpdates.get(update.field); + if (current != null && docIDUpto < current.docIDUpto) { + // Only record the new number if it's greater than or equal to the current + // one. This is important because if multiple threads are replacing the + // same doc at nearly the same time, it's possible that one thread that + // got a higher docID is scheduled before the other threads. + return; + } + + update.docIDUpto = docIDUpto; + termUpdates.put(update.field, update); + numNumericUpdates.incrementAndGet(); + if (current == null) { + bytesUsed.addAndGet(BYTES_PER_NUMERIC_UPDATE_ENTRY + update.sizeInBytes()); + } + } + void clear() { terms.clear(); queries.clear(); docIDs.clear(); + numericUpdates.clear(); numTermDeletes.set(0); + numNumericUpdates.set(0); bytesUsed.set(0); } - void clearDocIDs() { - bytesUsed.addAndGet(-docIDs.size()*BYTES_PER_DEL_DOCID); - docIDs.clear(); - } - boolean any() { - return terms.size() > 0 || docIDs.size() > 0 || queries.size() > 0; + return terms.size() > 0 || docIDs.size() > 0 || queries.size() > 0 || numericUpdates.size() > 0; } } Index: lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java (revision 1523412) +++ lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java (working copy) @@ -18,10 +18,11 @@ */ import java.io.IOException; -import java.util.List; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; -import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -35,12 +36,12 @@ /* Tracks the stream of {@link BufferedDeletes}. * When DocumentsWriterPerThread flushes, its buffered - * deletes are appended to this stream. We later - * apply these deletes (resolve them to the actual + * deletes and updates are appended to this stream. We later + * apply them (resolve them to the actual * docIDs, per segment) when a merge is started * (only to the to-be-merged segments). We * also apply to all segments when NRT reader is pulled, - * commit/close is called, or when too many deletes are + * commit/close is called, or when too many deletes or updates are * buffered and must be flushed (by RAM usage or by count). * * Each packet is assigned a generation, and each flushed or @@ -48,7 +49,7 @@ * track which BufferedDeletes packets to apply to any given * segment. */ -class BufferedDeletesStream { +class BufferedDeletesStream { // TODO (DVU_RENAME) BufferedUpdatesStream // TODO: maybe linked list? private final List deletes = new ArrayList(); @@ -114,6 +115,7 @@ } public static class ApplyDeletesResult { + // True if any actual deletes took place: public final boolean anyDeletes; @@ -123,10 +125,14 @@ // If non-null, contains segments that are 100% deleted public final List allDeleted; - ApplyDeletesResult(boolean anyDeletes, long gen, List allDeleted) { + // True if any actual numeric docvalues updates took place + public final boolean anyNumericDVUpdates; + + ApplyDeletesResult(boolean anyDeletes, long gen, List allDeleted, boolean anyNumericDVUpdates) { this.anyDeletes = anyDeletes; this.gen = gen; this.allDeleted = allDeleted; + this.anyNumericDVUpdates = anyNumericDVUpdates; } } @@ -145,7 +151,7 @@ final long t0 = System.currentTimeMillis(); if (infos.size() == 0) { - return new ApplyDeletesResult(false, nextGen++, null); + return new ApplyDeletesResult(false, nextGen++, null, false); } assert checkDeleteStats(); @@ -154,7 +160,7 @@ if (infoStream.isEnabled("BD")) { infoStream.message("BD", "applyDeletes: no deletes; skipping"); } - return new ApplyDeletesResult(false, nextGen++, null); + return new ApplyDeletesResult(false, nextGen++, null, false); } if (infoStream.isEnabled("BD")) { @@ -169,6 +175,7 @@ CoalescedDeletes coalescedDeletes = null; boolean anyNewDeletes = false; + boolean anyNewUpdates = false; int infosIDX = infos2.size()-1; int delIDX = deletes.size()-1; @@ -183,7 +190,7 @@ final long segGen = info.getBufferedDeletesGen(); if (packet != null && segGen < packet.delGen()) { - //System.out.println(" coalesce"); +// System.out.println(" coalesce"); if (coalescedDeletes == null) { coalescedDeletes = new CoalescedDeletes(); } @@ -206,7 +213,7 @@ // Lock order: IW -> BD -> RP assert readerPool.infoIsLive(info); final ReadersAndLiveDocs rld = readerPool.get(info, true); - final SegmentReader reader = rld.getReader(IOContext.READ); + final SegmentReader reader = rld.getReader(false, IOContext.READ); // don't apply deletes, as we're about to add more! int delCount = 0; final boolean segAllDeletes; try { @@ -214,11 +221,13 @@ //System.out.println(" del coalesced"); delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader); delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader); + anyNewUpdates |= applyNumericDocValueUpdates(coalescedDeletes.numericDVUpdates, rld, reader); } //System.out.println(" del exact"); // Don't delete by Term here; DocumentsWriterPerThread // already did that on flush: delCount += applyQueryDeletes(packet.queriesIterable(), rld, reader); + anyNewUpdates |= applyNumericDocValueUpdates(Arrays.asList(packet.updates), rld, reader); final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount(); assert fullDelCount <= rld.info.info.getDocCount(); segAllDeletes = fullDelCount == rld.info.info.getDocCount(); @@ -259,12 +268,13 @@ // Lock order: IW -> BD -> RP assert readerPool.infoIsLive(info); final ReadersAndLiveDocs rld = readerPool.get(info, true); - final SegmentReader reader = rld.getReader(IOContext.READ); + final SegmentReader reader = rld.getReader(false, IOContext.READ); // don't apply deletes, as we're about to add more! int delCount = 0; final boolean segAllDeletes; try { delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader); delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader); + anyNewUpdates |= applyNumericDocValueUpdates(coalescedDeletes.numericDVUpdates, rld, reader); final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount(); assert fullDelCount <= rld.info.info.getDocCount(); segAllDeletes = fullDelCount == rld.info.info.getDocCount(); @@ -297,7 +307,7 @@ } // assert infos != segmentInfos || !any() : "infos=" + infos + " segmentInfos=" + segmentInfos + " any=" + any; - return new ApplyDeletesResult(anyNewDeletes, gen, allDeleted); + return new ApplyDeletesResult(anyNewDeletes, gen, allDeleted, anyNewUpdates); } synchronized long getNextGen() { @@ -402,15 +412,15 @@ if (docID == DocIdSetIterator.NO_MORE_DOCS) { break; } + if (!any) { + rld.initWritableLiveDocs(); + any = true; + } // NOTE: there is no limit check on the docID // when deleting by Term (unlike by Query) // because on flush we apply all Term deletes to // each segment. So all Term deleting here is // against prior segments: - if (!any) { - rld.initWritableLiveDocs(); - any = true; - } if (rld.delete(docID)) { delCount++; } @@ -422,6 +432,65 @@ return delCount; } + // NumericDocValue Updates + private synchronized boolean applyNumericDocValueUpdates(Iterable updates, ReadersAndLiveDocs rld, SegmentReader reader) throws IOException { + Fields fields = reader.fields(); + if (fields == null) { + // This reader has no postings + return false; + } + + TermsEnum termsEnum = null; + DocsEnum docs = null; + boolean any = false; + //System.out.println(Thread.currentThread().getName() + " numericDVUpdate reader=" + reader); + for (NumericUpdate update : updates) { + Term term = update.term; + int limit = update.docIDUpto; + + // TODO: we rely on the map being ordered by updates order, not by terms order. + // we need that so that if two terms update the same document, the one that came + // last wins. + // alternatively, we could keep a map from doc->lastUpto and apply the update + // in terms order, where an update is applied only if its docIDUpto is greater + // than lastUpto. + // but, since app can send two updates, in order, which will have same upto, we + // cannot rely solely on docIDUpto, and need to have our own gen, which is + // incremented with every update. + + // Unlike applyTermDeletes, we visit terms in update order, not term order. + // Therefore we cannot assume we can only seek forwards and must ask for a + // new TermsEnum + Terms terms = fields.terms(term.field); + if (terms == null) { // no terms in that field + termsEnum = null; + continue; + } + + termsEnum = terms.iterator(termsEnum); + + // System.out.println(" term=" + term); + + if (termsEnum.seekExact(term.bytes())) { + // we don't need term frequencies for this + DocsEnum docsEnum = termsEnum.docs(rld.getLiveDocs(), docs, DocsEnum.FLAG_NONE); + + //System.out.println("BDS: got docsEnum=" + docsEnum); + + int doc; + while ((doc = docsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + //System.out.println(Thread.currentThread().getName() + " numericDVUpdate term=" + term + " doc=" + docID); + if (doc >= limit) { + break; // no more docs that can be updated for this term + } + rld.updateNumericDocValue(update.field, doc, update.value); + any = true; + } + } + } + return any; + } + public static class QueryAndLimit { public final Query query; public final int limit; Index: lucene/core/src/java/org/apache/lucene/index/CoalescedDeletes.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/CoalescedDeletes.java (revision 1523412) +++ lucene/core/src/java/org/apache/lucene/index/CoalescedDeletes.java (working copy) @@ -29,20 +29,27 @@ class CoalescedDeletes { final Map queries = new HashMap(); final List> iterables = new ArrayList>(); - + final List numericDVUpdates = new ArrayList(); + @Override public String toString() { // note: we could add/collect more debugging information - return "CoalescedDeletes(termSets=" + iterables.size() + ",queries=" + queries.size() + ")"; + return "CoalescedDeletes(termSets=" + iterables.size() + ",queries=" + queries.size() + ",numericUpdates=" + numericDVUpdates.size() + ")"; } void update(FrozenBufferedDeletes in) { iterables.add(in.termsIterable()); - for(int queryIdx=0;queryIdx termsIterable() { Index: lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java (revision 1523412) +++ lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java (working copy) @@ -158,6 +158,15 @@ return applyAllDeletes( deleteQueue); } + synchronized void updateNumericDocValue(Term term, String field, Long value) throws IOException { + final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; + deleteQueue.addNumericUpdate(new NumericUpdate(term, field, value)); + flushControl.doOnDelete(); + if (flushControl.doApplyAllDeletes()) { + applyAllDeletes(deleteQueue); + } + } + DocumentsWriterDeleteQueue currentDeleteSession() { return deleteQueue; } Index: lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java (revision 1523412) +++ lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java (working copy) @@ -107,6 +107,11 @@ tryApplyGlobalSlice(); } + void addNumericUpdate(NumericUpdate update) { + add(new NumericUpdateNode(update)); + tryApplyGlobalSlice(); + } + /** * invariant for document update */ @@ -380,7 +385,23 @@ } } + private static final class NumericUpdateNode extends Node { + NumericUpdateNode(NumericUpdate update) { + super(update); + } + + @Override + void apply(BufferedDeletes bufferedDeletes, int docIDUpto) { + bufferedDeletes.addNumericUpdate(item, docIDUpto); + } + + @Override + public String toString() { + return "update=" + item; + } + } + private boolean forceApplyGlobalSlice() { globalBufferLock.lock(); final Node currentTail = tail; Index: lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (revision 1523412) +++ lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (working copy) @@ -17,9 +17,11 @@ * limitations under the License. */ +import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK; +import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE; + import java.io.IOException; import java.text.NumberFormat; -import java.util.Collection; import java.util.HashSet; import java.util.Locale; import java.util.Set; @@ -41,9 +43,6 @@ import org.apache.lucene.util.MutableBits; import org.apache.lucene.util.RamUsageEstimator; -import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK; -import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE; - class DocumentsWriterPerThread { /** @@ -174,8 +173,9 @@ final DocConsumer consumer; final Counter bytesUsed; - //Deletes for our still-in-RAM (to be flushed next) segment - final BufferedDeletes pendingDeletes; + SegmentWriteState flushState; + // Deletes for our still-in-RAM (to be flushed next) segment + final BufferedDeletes pendingDeletes; private final SegmentInfo segmentInfo; // Current segment we are working on boolean aborting = false; // True if an abort is pending boolean hasAborted = false; // True if the last exception throws by #updateDocument was aborting @@ -467,7 +467,7 @@ pendingDeletes.terms.clear(); segmentInfo.setFiles(new HashSet(directory.getCreatedFiles())); - final SegmentInfoPerCommit segmentInfoPerCommit = new SegmentInfoPerCommit(segmentInfo, 0, -1L); + final SegmentInfoPerCommit segmentInfoPerCommit = new SegmentInfoPerCommit(segmentInfo, 0, -1L, -1L); if (infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", "new segment has " + (flushState.liveDocs == null ? 0 : (flushState.segmentInfo.getDocCount() - flushState.delCountOnFlush)) + " deleted docs"); infoStream.message("DWPT", "new segment has " + @@ -481,7 +481,8 @@ } final BufferedDeletes segmentDeletes; - if (pendingDeletes.queries.isEmpty()) { + if (pendingDeletes.queries.isEmpty() && pendingDeletes.numericUpdates.isEmpty()) { + pendingDeletes.clear(); segmentDeletes = null; } else { segmentDeletes = pendingDeletes; Index: lucene/core/src/java/org/apache/lucene/index/FieldInfo.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/FieldInfo.java (revision 1523412) +++ lucene/core/src/java/org/apache/lucene/index/FieldInfo.java (working copy) @@ -79,7 +79,7 @@ * Character offsets are encoded alongside the positions. */ DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS, - }; + } /** * DocValues types. @@ -110,7 +110,7 @@ * ordinal and by-value. Values must be <= 32766 bytes. */ SORTED_SET - }; + } /** * Sole Constructor. Index: lucene/core/src/java/org/apache/lucene/index/FieldInfos.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/FieldInfos.java (revision 1523412) +++ lucene/core/src/java/org/apache/lucene/index/FieldInfos.java (working copy) @@ -223,6 +223,20 @@ (dvType == null || docValuesType.get(name) == null || dvType == docValuesType.get(name)); } + /** + * Returns true if the {@code fieldName} exists in the map and is of the + * same {@code dvType}. + */ + synchronized boolean contains(String fieldName, DocValuesType dvType) { + // used by IndexWriter.updateNumericDocValue + if (!nameToNumber.containsKey(fieldName)) { + return false; + } else { + // only return true if the field has the same dvType as the requested one + return dvType == docValuesType.get(fieldName); + } + } + synchronized void clear() { numberToName.clear(); nameToNumber.clear(); Index: lucene/core/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java (revision 1523412) +++ lucene/core/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java (working copy) @@ -17,32 +17,38 @@ * limitations under the License. */ +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Map; +import org.apache.lucene.index.BufferedDeletesStream.QueryAndLimit; import org.apache.lucene.search.Query; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.RamUsageEstimator; -import org.apache.lucene.index.BufferedDeletesStream.QueryAndLimit; -/** Holds buffered deletes by term or query, once pushed. - * Pushed deletes are write-once, so we shift to more - * memory efficient data structure to hold them. We don't - * hold docIDs because these are applied on flush. */ +/** + * Holds buffered deletes and updates by term or query, once pushed. Pushed + * deletes/updates are write-once, so we shift to more memory efficient data + * structure to hold them. We don't hold docIDs because these are applied on + * flush. + */ +class FrozenBufferedDeletes { // TODO (DVU_RENAME) FrozenBufferedUpdates? -class FrozenBufferedDeletes { - /* Query we often undercount (say 24 bytes), plus int. */ final static int BYTES_PER_DEL_QUERY = RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_INT + 24; - + // Terms, in sorted order: final PrefixCodedTerms terms; int termCount; // just for debugging - // Parallel array of deleted query, and the docIDUpto for - // each + // Parallel array of deleted query, and the docIDUpto for each final Query[] queries; final int[] queryLimits; + + // numeric DV update term and their updates + final NumericUpdate[] updates; + final int bytesUsed; final int numTermDeletes; private long gen = -1; // assigned by BufferedDeletesStream once pushed @@ -72,7 +78,17 @@ upto++; } - bytesUsed = (int) terms.getSizeInBytes() + queries.length * BYTES_PER_DEL_QUERY; + List allUpdates = new ArrayList(); + int numericUpdatesSize = 0; + for (Map fieldUpdates : deletes.numericUpdates.values()) { + for (NumericUpdate update : fieldUpdates.values()) { + allUpdates.add(update); + numericUpdatesSize += update.sizeInBytes(); + } + } + updates = allUpdates.toArray(new NumericUpdate[allUpdates.size()]); + + bytesUsed = (int) terms.getSizeInBytes() + queries.length * BYTES_PER_DEL_QUERY + numericUpdatesSize + updates.length * RamUsageEstimator.NUM_BYTES_OBJECT_REF; numTermDeletes = deletes.numTermDeletes.get(); } @@ -140,6 +156,6 @@ } boolean any() { - return termCount > 0 || queries.length > 0; + return termCount > 0 || queries.length > 0 || updates.length > 0; } } Index: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (revision 1523412) +++ lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -36,6 +36,7 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; +import org.apache.lucene.index.FieldInfo.DocValuesType; import org.apache.lucene.index.FieldInfos.FieldNumbers; import org.apache.lucene.index.IndexWriterConfig.OpenMode; import org.apache.lucene.index.MergePolicy.MergeTrigger; @@ -459,10 +460,10 @@ // This is the last ref to this RLD, and we're not // pooling, so remove it: if (rld.writeLiveDocs(directory)) { - // Make sure we only write del docs for a live segment: + // Make sure we only write del docs and field updates for a live segment: assert infoIsLive(rld.info); // Must checkpoint w/ deleter, because we just - // created created new _X_N.del file. + // created new _X_N.del and field updates files. deleter.checkpoint(segmentInfos, false); } @@ -481,10 +482,10 @@ try { if (doSave && rld.writeLiveDocs(directory)) { - // Make sure we only write del docs for a live segment: + // Make sure we only write del docs and field updates for a live segment: assert infoIsLive(rld.info); // Must checkpoint w/ deleter, because we just - // created created new _X_N.del file. + // created created new _X_N.del and field updates files. deleter.checkpoint(segmentInfos, false); } } catch (Throwable t) { @@ -529,11 +530,15 @@ if (rld != null) { assert rld.info == info; if (rld.writeLiveDocs(directory)) { - // Make sure we only write del docs for a live segment: + // Make sure we only write del docs and updates for a live segment: assert infoIsLive(info); + // Must checkpoint w/ deleter, because we just - // created created new _X_N.del file. + // created new _X_N.del and field updates files. deleter.checkpoint(segmentInfos, false); + + // we wrote liveDocs and field updates, reopen the reader + rld.reopenReader(IOContext.READ); } } } @@ -1532,6 +1537,48 @@ } } + /** + * Updates a document's NumericDocValue for field to the given + * value. This method can be used to 'unset' a document's value + * by passing {@code null} as the new value. Also, you can only update fields + * that already exist in the index, not add new fields through this method. + * + *

+ * NOTE: it is currently not allowed to update the value of documents + * in a segment where the field does not exist (even though it may exist in + * other segments). If you try that, you will hit an + * {@link UnsupportedOperationException} when the segment is later flushed + * (following an NRT reader reopen, commit, forceMerge etc.). + * + *

+ * NOTE: if this method hits an OutOfMemoryError you should immediately + * close the writer. See above for details. + *

+ * + * @param term + * the term to identify the document(s) to be updated + * @param field + * field name of the NumericDocValues field + * @param value + * new value for the field + * @throws CorruptIndexException + * if the index is corrupt + * @throws IOException + * if there is a low-level IO error + */ + // TODO (DVU_FIELDINFOS_GEN) remove the paragraph on updating segments without the field not allowed + public void updateNumericDocValue(Term term, String field, Long value) throws IOException { + ensureOpen(); + if (!globalFieldNumberMap.contains(field, DocValuesType.NUMERIC)) { + throw new IllegalArgumentException("can only update existing numeric-docvalues fields!"); + } + try { + docWriter.updateNumericDocValue(term, field, value); + } catch (OutOfMemoryError oom) { + handleOOM(oom, "updateNumericDocValue"); + } + } + // for test purpose final synchronized int getSegmentCount(){ return segmentInfos.size(); @@ -1915,7 +1962,6 @@ merge.maxNumSegments = maxNumSegments; } } - } else { spec = mergePolicy.findMerges(trigger, segmentInfos); } @@ -2512,7 +2558,7 @@ } } - SegmentInfoPerCommit infoPerCommit = new SegmentInfoPerCommit(info, 0, -1L); + SegmentInfoPerCommit infoPerCommit = new SegmentInfoPerCommit(info, 0, -1L, -1L); info.setFiles(new HashSet(trackingDir.getCreatedFiles())); trackingDir.getCreatedFiles().clear(); @@ -2599,7 +2645,7 @@ SegmentInfo newInfo = new SegmentInfo(directory, info.info.getVersion(), segName, info.info.getDocCount(), info.info.getUseCompoundFile(), info.info.getCodec(), info.info.getDiagnostics(), attributes); - SegmentInfoPerCommit newInfoPerCommit = new SegmentInfoPerCommit(newInfo, info.getDelCount(), info.getDelGen()); + SegmentInfoPerCommit newInfoPerCommit = new SegmentInfoPerCommit(newInfo, info.getDelCount(), info.getDelGen(), info.getDocValuesGen()); Set segFiles = new HashSet(); @@ -3011,7 +3057,7 @@ flushDeletesCount.incrementAndGet(); final BufferedDeletesStream.ApplyDeletesResult result; result = bufferedDeletesStream.applyDeletes(readerPool, segmentInfos.asList()); - if (result.anyDeletes) { + if (result.anyDeletes || result.anyNumericDVUpdates) { checkpoint(); } if (!keepFullyDeletedSegments && result.allDeleted != null) { @@ -3063,15 +3109,23 @@ } } - /** Carefully merges deletes for the segments we just - * merged. This is tricky because, although merging will - * clear all deletes (compacts the documents), new - * deletes may have been flushed to the segments since - * the merge was started. This method "carries over" - * such new deletes onto the newly merged segment, and - * saves the resulting deletes file (incrementing the - * delete generation for merge.info). If no deletes were - * flushed, no new deletes file is saved. */ + private MergePolicy.DocMap getDocMap(MergePolicy.OneMerge merge, MergeState mergeState) { + MergePolicy.DocMap docMap = merge.getDocMap(mergeState); + assert docMap.isConsistent(merge.info.info.getDocCount()); + return docMap; + } + + /** + * Carefully merges deletes and updates for the segments we just merged. This + * is tricky because, although merging will clear all deletes (compacts the + * documents) and compact all the updates, new deletes and updates may have + * been flushed to the segments since the merge was started. This method + * "carries over" such new deletes and updates onto the newly merged segment, + * and saves the resulting deletes and updates files (incrementing the delete + * and DV generations for merge.info). If no deletes were flushed, no new + * deletes file is saved. + */ + // TODO (DVU_RENAME) to commitMergedDeletesAndUpdates synchronized private ReadersAndLiveDocs commitMergedDeletes(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException { assert testPoint("startCommitMergeDeletes"); @@ -3088,20 +3142,22 @@ long minGen = Long.MAX_VALUE; // Lazy init (only when we find a delete to carry over): - ReadersAndLiveDocs mergedDeletes = null; + ReadersAndLiveDocs mergedDeletes = null; // TODO (DVU_RENAME) to mergedDeletesAndUpdates + boolean initWritableLiveDocs = false; MergePolicy.DocMap docMap = null; - - for(int i=0; i < sourceSegments.size(); i++) { + final Map> mergedUpdates = new HashMap>(); + + for (int i = 0; i < sourceSegments.size(); i++) { SegmentInfoPerCommit info = sourceSegments.get(i); minGen = Math.min(info.getBufferedDeletesGen(), minGen); final int docCount = info.info.getDocCount(); final Bits prevLiveDocs = merge.readers.get(i).getLiveDocs(); - final Bits currentLiveDocs; final ReadersAndLiveDocs rld = readerPool.get(info, false); // We hold a ref so it should still be in the pool: assert rld != null: "seg=" + info.info.name; - currentLiveDocs = rld.getLiveDocs(); - + final Bits currentLiveDocs = rld.getLiveDocs(); + final Map> mergingUpdates = rld.getMergingUpdates(); + if (prevLiveDocs != null) { // If we had deletions on starting the merge we must @@ -3123,11 +3179,10 @@ // If so, we must carefully merge the liveDocs one // doc at a time: if (currentLiveDocs != prevLiveDocs) { - // This means this segment received new deletes // since we started the merge, so we // must merge them: - for(int j=0;j docUpdates = mergingUpdates.get(Integer.valueOf(j)); + if (docUpdates != null) { + if (mergedDeletes == null) { + mergedDeletes = readerPool.get(merge.info, true); + docMap = getDocMap(merge, mergeState); + } + mergedUpdates.put(Integer.valueOf(docMap.map(docUpto)), docUpdates); + } } docUpto++; } } + } else if (mergingUpdates != null) { + // need to check each non-deleted document if it has any updates + for (int j = 0; j < docCount; j++) { + if (prevLiveDocs.get(j)) { + // document isn't deleted, check if it has updates + Map docUpdates = mergingUpdates.get(Integer.valueOf(j)); + if (docUpdates != null) { + if (mergedDeletes == null) { + mergedDeletes = readerPool.get(merge.info, true); + docMap = getDocMap(merge, mergeState); + } + mergedUpdates.put(Integer.valueOf(docMap.map(docUpto)), docUpdates); + } + // advance docUpto for every non-deleted document + docUpto++; + } + } } else { docUpto += info.info.getDocCount() - info.getDelCount() - rld.getPendingDeleteCount(); } @@ -3150,18 +3235,45 @@ assert currentLiveDocs.length() == docCount; // This segment had no deletes before but now it // does: - for(int j=0; j docUpdates = mergingUpdates.get(Integer.valueOf(j)); + if (docUpdates != null) { + if (mergedDeletes == null) { + mergedDeletes = readerPool.get(merge.info, true); + docMap = getDocMap(merge, mergeState); + } + mergedUpdates.put(Integer.valueOf(docMap.map(docUpto)), docUpdates); + } } docUpto++; } + } else if (mergingUpdates != null) { + // no deletions before or after, but there were updates + for (int j = 0; j < docCount; j++) { + Map docUpdates = mergingUpdates.get(Integer.valueOf(j)); + if (docUpdates != null) { + if (mergedDeletes == null) { + mergedDeletes = readerPool.get(merge.info, true); + docMap = getDocMap(merge, mergeState); + } + mergedUpdates.put(Integer.valueOf(docMap.map(docUpto)), docUpdates); + } + // advance docUpto for every non-deleted document + docUpto++; + } } else { // No deletes before or after docUpto += info.info.getDocCount(); @@ -3170,11 +3282,18 @@ assert docUpto == merge.info.info.getDocCount(); + // set any updates that came while the segment was merging + if (!mergedUpdates.isEmpty()) { + assert mergedDeletes != null; + mergedDeletes.setMergingUpdates(mergedUpdates); + } + if (infoStream.isEnabled("IW")) { if (mergedDeletes == null) { - infoStream.message("IW", "no new deletes since merge started"); + infoStream.message("IW", "no new deletes or field updates since merge started"); } else { - infoStream.message("IW", mergedDeletes.getPendingDeleteCount() + " new deletes since merge started"); + infoStream.message("IW", mergedDeletes.getPendingDeleteCount() + " new deletes since merge started and " + + mergedDeletes.getPendingUpdatesCount() + " new field updates since merge started"); } } @@ -3213,7 +3332,7 @@ final ReadersAndLiveDocs mergedDeletes = merge.info.info.getDocCount() == 0 ? null : commitMergedDeletes(merge, mergeState); - assert mergedDeletes == null || mergedDeletes.getPendingDeleteCount() != 0; + assert mergedDeletes == null || mergedDeletes.getPendingDeleteCount() != 0 || mergedDeletes.hasFieldUpdates(); // If the doc store we are using has been closed and // is in now compound format (but wasn't when we @@ -3527,8 +3646,8 @@ // Lock order: IW -> BD final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream.applyDeletes(readerPool, merge.segments); - - if (result.anyDeletes) { + + if (result.anyDeletes || result.anyNumericDVUpdates) { checkpoint(); } @@ -3556,8 +3675,10 @@ details.put("mergeMaxNumSegments", "" + merge.maxNumSegments); details.put("mergeFactor", Integer.toString(merge.segments.size())); setDiagnostics(si, SOURCE_MERGE, details); - merge.setInfo(new SegmentInfoPerCommit(si, 0, -1L)); + merge.setInfo(new SegmentInfoPerCommit(si, 0, -1L, -1L)); +// System.out.println("[" + Thread.currentThread().getName() + "] _mergeInit: " + segString(merge.segments) + " into " + si); + // Lock order: IW -> BD bufferedDeletesStream.prune(segmentInfos); @@ -3598,7 +3719,7 @@ // exception inside mergeInit if (merge.registerDone) { final List sourceSegments = merge.segments; - for(SegmentInfoPerCommit info : sourceSegments) { + for (SegmentInfoPerCommit info : sourceSegments) { mergingSegments.remove(info); } merge.registerDone = false; @@ -3623,6 +3744,7 @@ if (drop) { rld.dropChanges(); } + rld.setMerging(false); rld.release(sr); readerPool.release(rld); if (drop) { @@ -3680,9 +3802,13 @@ // Hold onto the "live" reader; we will use this to // commit merged deletes final ReadersAndLiveDocs rld = readerPool.get(info, true); - SegmentReader reader = rld.getReader(context); + SegmentReader reader = rld.getReader(true, context); assert reader != null; + // Notify that we are merging, so that we can later copy the updates + // that were received while merging to the merged segment. + rld.setMerging(true); + // Carefully pull the most recent live docs: final Bits liveDocs; final int delCount; @@ -3715,7 +3841,7 @@ // fix the reader's live docs and del count assert delCount > reader.numDeletedDocs(); // beware of zombies - SegmentReader newReader = new SegmentReader(info, reader.core, liveDocs, info.info.getDocCount() - delCount); + SegmentReader newReader = new SegmentReader(info, reader, liveDocs, info.info.getDocCount() - delCount); boolean released = false; try { rld.release(reader); @@ -3877,7 +4003,7 @@ final IndexReaderWarmer mergedSegmentWarmer = config.getMergedSegmentWarmer(); if (poolReaders && mergedSegmentWarmer != null && merge.info.info.getDocCount() != 0) { final ReadersAndLiveDocs rld = readerPool.get(merge.info, true); - final SegmentReader sr = rld.getReader(IOContext.READ); + final SegmentReader sr = rld.getReader(true, IOContext.READ); try { mergedSegmentWarmer.warm(sr); } finally { Index: lucene/core/src/java/org/apache/lucene/index/MergeState.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/MergeState.java (revision 1523412) +++ lucene/core/src/java/org/apache/lucene/index/MergeState.java (working copy) @@ -105,11 +105,11 @@ } - private static class NoDelDocMap extends DocMap { + private static final class NoDelDocMap extends DocMap { private final int maxDoc; - private NoDelDocMap(int maxDoc) { + NoDelDocMap(int maxDoc) { this.maxDoc = maxDoc; } Index: lucene/core/src/java/org/apache/lucene/index/NumericUpdate.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/NumericUpdate.java (revision 0) +++ lucene/core/src/java/org/apache/lucene/index/NumericUpdate.java (working copy) @@ -0,0 +1,68 @@ +package org.apache.lucene.index; + +import static org.apache.lucene.util.RamUsageEstimator.*; + +import org.apache.lucene.document.NumericDocValuesField; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** An in-place update to a numeric docvalues field */ +final class NumericUpdate { + + /* Rough logic: OBJ_HEADER + 3*PTR + INT + * Term: OBJ_HEADER + 2*PTR + * Term.field: 2*OBJ_HEADER + 4*INT + PTR + string.length*CHAR + * Term.bytes: 2*OBJ_HEADER + 2*INT + PTR + bytes.length + * String: 2*OBJ_HEADER + 4*INT + PTR + string.length*CHAR + * Long: OBJ_HEADER + LONG + */ + private static final int RAW_SIZE_IN_BYTES = 9*NUM_BYTES_OBJECT_HEADER + 8*NUM_BYTES_OBJECT_REF + 8*NUM_BYTES_INT + NUM_BYTES_LONG; + + static final Long MISSING = new Long(0); + + Term term; + String field; + Long value; + int docIDUpto = -1; // unassigned until applied, and confusing that it's here, when it's just used in BufferedDeletes... + + /** + * Constructor. + * + * @param term the {@link Term} which determines the documents that will be updated + * @param field the {@link NumericDocValuesField} to update + * @param value the updated value + */ + NumericUpdate(Term term, String field, Long value) { + this.term = term; + this.field = field; + this.value = value == null ? MISSING : value; + } + + int sizeInBytes() { + int sizeInBytes = RAW_SIZE_IN_BYTES; + sizeInBytes += term.field.length() * NUM_BYTES_CHAR; + sizeInBytes += term.bytes.bytes.length; + sizeInBytes += field.length() * NUM_BYTES_CHAR; + return sizeInBytes; + } + + @Override + public String toString() { + return "term=" + term + ",field=" + field + ",value=" + value; + } +} Index: lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java (revision 1523412) +++ lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java (working copy) @@ -18,19 +18,29 @@ */ import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.DocValuesConsumer; +import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.LiveDocsFormat; +import org.apache.lucene.index.FieldInfo.DocValuesType; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.TrackingDirectoryWrapper; import org.apache.lucene.util.Bits; +import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.MutableBits; // Used by IndexWriter to hold open SegmentReaders (for -// searching or merging), plus pending deletes, +// searching or merging), plus pending deletes and updates, // for a given segment -class ReadersAndLiveDocs { +class ReadersAndLiveDocs { // TODO (DVU_RENAME) to ReaderAndUpdates // Not final because we replace (clone) when we need to // change it and it's been shared: public final SegmentInfoPerCommit info; @@ -43,13 +53,16 @@ // Set once (null, and then maybe set, and never set again): private SegmentReader reader; - // Holds the current shared (readable and writable - // liveDocs). This is null when there are no deleted + // Holds the current shared (readable and writable) + // liveDocs. This is null when there are no deleted // docs, and it's copy-on-write (cloned whenever we need // to change it but it's been shared to an external NRT // reader). private Bits liveDocs; + // Holds the numeric DocValues updates. + private final Map> numericUpdates = new HashMap>(); + // How many further deletions we've done against // liveDocs vs when we loaded it or last wrote it: private int pendingDeleteCount; @@ -56,12 +69,22 @@ // True if the current liveDocs is referenced by an // external NRT reader: - private boolean shared; + private boolean liveDocsShared; + // Indicates whether this segment is currently being merged. While a segment + // is merging, all field updates are also registered in the mergingUpdates + // map. Also, calls to writeLiveDocs merge the updates with mergingUpdates. + // That way, when the segment is done merging, IndexWriter can apply the + // updates on the merged segment too. + private boolean isMerging = false; + + // Holds any updates that come through while this segment was being merged. + private final Map> mergingUpdates = new HashMap>(); + public ReadersAndLiveDocs(IndexWriter writer, SegmentInfoPerCommit info) { this.info = info; this.writer = writer; - shared = true; + liveDocsShared = true; } public void incRef() { @@ -83,7 +106,19 @@ public synchronized int getPendingDeleteCount() { return pendingDeleteCount; } - + + public synchronized boolean hasFieldUpdates() { + return numericUpdates.size() > 0; + } + + public synchronized int getPendingUpdatesCount() { + int pendingUpdatesCount = 0; + for (Entry> e : numericUpdates.entrySet()) { + pendingUpdatesCount += e.getValue().size(); + } + return pendingUpdatesCount; + } + // Call only from assert! public synchronized boolean verifyDocCounts() { int count; @@ -102,10 +137,26 @@ return true; } - // Get reader for searching/deleting - public synchronized SegmentReader getReader(IOContext context) throws IOException { - //System.out.println(" livedocs=" + rld.liveDocs); - + public synchronized void reopenReader(IOContext context) throws IOException { + if (reader != null) { + SegmentReader newReader = new SegmentReader(info, reader, liveDocs, info.info.getDocCount() - info.getDelCount() - pendingDeleteCount); + boolean reopened = false; + try { + reader.decRef(); + reader = newReader; + if (liveDocs == null) { + liveDocs = reader.getLiveDocs(); + } + reopened = true; + } finally { + if (!reopened) { + newReader.decRef(); + } + } + } + } + + private synchronized SegmentReader doGetReader(IOContext context) throws IOException { if (reader == null) { // We steal returned ref: reader = new SegmentReader(info, context); @@ -112,25 +163,83 @@ if (liveDocs == null) { liveDocs = reader.getLiveDocs(); } - //System.out.println("ADD seg=" + rld.info + " isMerge=" + isMerge + " " + readerMap.size() + " in pool"); - //System.out.println(Thread.currentThread().getName() + ": getReader seg=" + info.name); } - + // Ref for caller reader.incRef(); return reader; } - + + private SegmentReader doGetReaderWithUpdates(IOContext context) throws IOException { + boolean checkpoint = false; + try { + // don't synchronize the entire method because we cannot call + // writer.checkpoint() while holding the RLD lock, otherwise we might hit + // a deadlock w/ e.g. a concurrent merging thread. + synchronized (this) { + checkpoint = writeLiveDocs(info.info.dir); + if (reader == null) { + // We steal returned ref: + reader = new SegmentReader(info, context); + if (liveDocs == null) { + liveDocs = reader.getLiveDocs(); + } + } else if (checkpoint) { + // enroll a new reader with the applied updates + reopenReader(context); + } + + // Ref for caller + reader.incRef(); + return reader; + } + } finally { + if (checkpoint) { + writer.checkpoint(); + } + } + } + + /** Returns a {@link SegmentReader} while applying field updates if requested. */ + public SegmentReader getReader(boolean applyFieldUpdates, IOContext context) throws IOException { + // if we need to apply field updates, we call writeLiveDocs and change + // SegmentInfos. Therefore must hold the lock on IndexWriter. This code + // ensures that readers that don't need to apply updates don't pay the + // cost of obtaining it. + if (applyFieldUpdates && hasFieldUpdates()) { + synchronized (writer) { + return doGetReaderWithUpdates(context); + } + } else { + return doGetReader(context); + } + } + public synchronized void release(SegmentReader sr) throws IOException { assert info == sr.getSegmentInfo(); sr.decRef(); } + /** + * Updates the numeric doc value of {@code docID} under {@code field} to the + * given {@code value}. + */ + public synchronized void updateNumericDocValue(String field, int docID, Long value) { + assert Thread.holdsLock(writer); + assert docID >= 0 && docID < reader.maxDoc() : "out of bounds: docid=" + docID + " maxDoc=" + reader.maxDoc() + " seg=" + info.info.name + " docCount=" + info.info.getDocCount(); + Map updates = numericUpdates.get(field); + if (updates == null) { + updates = new HashMap(); + numericUpdates.put(field, updates); + } + updates.put(docID, value); + } + public synchronized boolean delete(int docID) { assert liveDocs != null; assert Thread.holdsLock(writer); assert docID >= 0 && docID < liveDocs.length() : "out of bounds: docid=" + docID + " liveDocsLength=" + liveDocs.length() + " seg=" + info.info.name + " docCount=" + info.info.getDocCount(); - assert !shared; + assert !liveDocsShared; final boolean didDelete = liveDocs.get(docID); if (didDelete) { ((MutableBits) liveDocs).clear(docID); @@ -162,13 +271,11 @@ * it when you're done (ie, do not call release()). */ public synchronized SegmentReader getReadOnlyClone(IOContext context) throws IOException { - if (reader == null) { - getReader(context).decRef(); - assert reader != null; - } - shared = true; + getReader(true, context).decRef(); // make sure we enroll a new reader if there are field updates + assert reader != null; + liveDocsShared = true; if (liveDocs != null) { - return new SegmentReader(reader.getSegmentInfo(), reader.core, liveDocs, info.info.getDocCount() - info.getDelCount() - pendingDeleteCount); + return new SegmentReader(reader.getSegmentInfo(), reader, liveDocs, info.info.getDocCount() - info.getDelCount() - pendingDeleteCount); } else { assert reader.getLiveDocs() == liveDocs; reader.incRef(); @@ -180,7 +287,7 @@ assert Thread.holdsLock(writer); assert info.info.getDocCount() > 0; //System.out.println("initWritableLivedocs seg=" + info + " liveDocs=" + liveDocs + " shared=" + shared); - if (shared) { + if (liveDocsShared) { // Copy on write: this means we've cloned a // SegmentReader sharing the current liveDocs // instance; must now make a private clone so we can @@ -192,9 +299,7 @@ } else { liveDocs = liveDocsFormat.newLiveDocs(liveDocs); } - shared = false; - } else { - assert liveDocs != null; + liveDocsShared = false; } } @@ -206,7 +311,7 @@ public synchronized Bits getReadOnlyLiveDocs() { //System.out.println("getROLiveDocs seg=" + info); assert Thread.holdsLock(writer); - shared = true; + liveDocsShared = true; //if (liveDocs != null) { //System.out.println(" liveCount=" + liveDocs.count()); //} @@ -222,61 +327,270 @@ // deletes onto the newly merged segment, so we can // discard them on the sub-readers: pendingDeleteCount = 0; + numericUpdates.clear(); + mergingUpdates.clear(); } - // Commit live docs to the directory (writes new - // _X_N.del files); returns true if it wrote the file - // and false if there were no new deletes to write: + // Commit live docs (writes new _X_N.del files) and field updates (writes new + // _X_N updates files) to the directory; returns true if it wrote any file + // and false if there were no new deletes or updates to write: + // TODO (DVU_RENAME) to writeDeletesAndUpdates public synchronized boolean writeLiveDocs(Directory dir) throws IOException { - //System.out.println("rld.writeLiveDocs seg=" + info + " pendingDelCount=" + pendingDeleteCount); - if (pendingDeleteCount != 0) { - // We have new deletes + assert Thread.holdsLock(writer); + //System.out.println("rld.writeLiveDocs seg=" + info + " pendingDelCount=" + pendingDeleteCount + " numericUpdates=" + numericUpdates); + final boolean hasFieldUpdates = hasFieldUpdates(); + if (pendingDeleteCount == 0 && !hasFieldUpdates) { + return false; + } + + // We have new deletes or updates + if (pendingDeleteCount > 0) { assert liveDocs.length() == info.info.getDocCount(); + } + + // Do this so we can delete any created files on + // exception; this saves all codecs from having to do + // it: + TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir); + + // We can write directly to the actual name (vs to a + // .tmp & renaming it) because the file is not live + // until segments file is written: + boolean success = false; + try { + Codec codec = info.info.getCodec(); + if (pendingDeleteCount > 0) { + codec.liveDocsFormat().writeLiveDocs((MutableBits)liveDocs, trackingDir, info, pendingDeleteCount, IOContext.DEFAULT); + } + + // apply numeric updates if there are any + if (hasFieldUpdates) { + // reader could be null e.g. for a just merged segment (from + // IndexWriter.commitMergedDeletes). + final SegmentReader reader = this.reader == null ? new SegmentReader(info, IOContext.READONCE) : this.reader; + try { + // clone FieldInfos so that we can update their numericUpdatesGen + // separately from the reader's infos and write them to a new + // fieldInfos_gen file + FieldInfos.Builder builder = new FieldInfos.Builder(writer.globalFieldNumberMap); + // cannot use builder.add(reader.getFieldInfos()) because it does not clone FI.attributes + for (FieldInfo fi : reader.getFieldInfos()) { + FieldInfo clone = builder.add(fi); + // copy the stuff FieldInfos.Builder doesn't copy + if (fi.attributes() != null) { + for (Entry e : fi.attributes().entrySet()) { + clone.putAttribute(e.getKey(), e.getValue()); + } + } + } + // create new fields or update existing ones to have NumericDV type +// for (String f : numericUpdates.keySet()) { +// builder.addOrUpdate(f, NumericDocValuesField.TYPE); +// } + + final FieldInfos fieldInfos = builder.finish(); + final long nextDocValuesGen = info.getNextDocValuesGen(); + final String segmentSuffix = Long.toString(nextDocValuesGen, Character.MAX_RADIX); + final SegmentWriteState state = new SegmentWriteState(null, trackingDir, info.info, fieldInfos, null, IOContext.DEFAULT, segmentSuffix, true); + final DocValuesFormat docValuesFormat = codec.docValuesFormat(); + final DocValuesConsumer fieldsConsumer = docValuesFormat.fieldsConsumer(state); + boolean fieldsConsumerSuccess = false; + try { + for (Entry> e : numericUpdates.entrySet()) { + final String field = e.getKey(); + final Map updates = e.getValue(); + final FieldInfo fieldInfo = fieldInfos.fieldInfo(field); - // Do this so we can delete any created files on - // exception; this saves all codecs from having to do - // it: - TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir); + if (fieldInfo == null || fieldInfo.getDocValuesType() != DocValuesType.NUMERIC) { + throw new UnsupportedOperationException( + "cannot update docvalues in a segment with no docvalues field: segment=" + info + ", field=" + field); + } +// assert fieldInfo != null; - // We can write directly to the actual name (vs to a - // .tmp & renaming it) because the file is not live - // until segments file is written: - boolean success = false; - try { - info.info.getCodec().liveDocsFormat().writeLiveDocs((MutableBits)liveDocs, trackingDir, info, pendingDeleteCount, IOContext.DEFAULT); - success = true; - } finally { - if (!success) { - // Advance only the nextWriteDelGen so that a 2nd - // attempt to write will write to a new file - info.advanceNextWriteDelGen(); + info.setDocValuesGen(fieldInfo.number, nextDocValuesGen); + + // write the numeric updates to a new gen'd docvalues file + fieldsConsumer.addNumericField(fieldInfo, new Iterable() { + @SuppressWarnings("synthetic-access") + final NumericDocValues currentValues = reader.getNumericDocValues(field); + @Override + public Iterator iterator() { + return new Iterator() { - // Delete any partially created file(s): - for(String fileName : trackingDir.getCreatedFiles()) { - try { - dir.deleteFile(fileName); - } catch (Throwable t) { - // Ignore so we throw only the first exc + @SuppressWarnings("synthetic-access") + final int maxDoc = reader.maxDoc(); + int curDoc = -1; + + @Override + public boolean hasNext() { + return curDoc < maxDoc - 1; + } + + @Override + public Number next() { + if (++curDoc >= maxDoc) { + throw new NoSuchElementException("no more documents to return values for"); + } + Long updatedValue = updates.get(curDoc); + if (updatedValue == null) { + updatedValue = Long.valueOf(currentValues.get(curDoc)); + } else if (updatedValue == NumericUpdate.MISSING) { + updatedValue = null; + } + return updatedValue; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("this iterator does not support removing elements"); + } + + }; + } + }); } + fieldsConsumerSuccess = true; + } finally { + if (fieldsConsumerSuccess) { + fieldsConsumer.close(); + } else { + IOUtils.closeWhileHandlingException(fieldsConsumer); + } } + } finally { + if (reader != this.reader) { + reader.close(); + } } } - - // If we hit an exc in the line above (eg disk full) - // then info's delGen remains pointing to the previous - // (successfully written) del docs: + success = true; + } finally { + if (!success) { + // Advance only the nextWriteDelGen so that a 2nd + // attempt to write will write to a new file + if (pendingDeleteCount > 0) { + info.advanceNextWriteDelGen(); + } + + // Advance only the nextWriteDocValuesGen so that a 2nd + // attempt to write will write to a new file + if (hasFieldUpdates) { + info.advanceNextWriteDocValuesGen(); + } + + // Delete any partially created file(s): + for (String fileName : trackingDir.getCreatedFiles()) { + try { + dir.deleteFile(fileName); + } catch (Throwable t) { + // Ignore so we throw only the first exc + } + } + } + } + + // If we hit an exc in the line above (eg disk full) + // then info's delGen remains pointing to the previous + // (successfully written) del docs: + if (pendingDeleteCount > 0) { info.advanceDelGen(); info.setDelCount(info.getDelCount() + pendingDeleteCount); - pendingDeleteCount = 0; - return true; - } else { - return false; } + + if (hasFieldUpdates) { + info.advanceDocValuesGen(); + // copy all the updates to mergingUpdates, so they can later be applied to the merged segment + if (isMerging) { + copyUpdatesToMerging(); + } + numericUpdates.clear(); + } + + info.addUpdatesFiles(trackingDir.getCreatedFiles()); + + return true; } + private void copyUpdatesToMerging() { + for (Entry> e : numericUpdates.entrySet()) { + String field = e.getKey(); + Map merging = mergingUpdates.get(field); + if (merging == null) { + mergingUpdates.put(field, new HashMap(e.getValue())); + } else { + merging.putAll(e.getValue()); + } + } + } + + /** + * Indicates whether this segment is currently being merged. Call this just + * before the segment is being merged with {@code true} and when the merge has + * finished and all updates have been applied to the merged segment, call this + * with {@code false}. + */ + public synchronized void setMerging(boolean isMerging) { + this.isMerging = isMerging; + if (!isMerging) { + mergingUpdates.clear(); + } + } + + /** + * Called from IndexWriter after applying deletes to the merged segment, while + * it was being merged. + */ + public synchronized void setMergingUpdates(Map> updates) { + for (Entry> e : updates.entrySet()) { + int doc = e.getKey().intValue(); + for (Entry docUpdates : e.getValue().entrySet()) { + String field = docUpdates.getKey(); + Long value = docUpdates.getValue(); + Map fieldUpdates = numericUpdates.get(field); + if (fieldUpdates == null) { + fieldUpdates = new HashMap(); + numericUpdates.put(field, fieldUpdates); + } + fieldUpdates.put(doc, value); + } + } + } + + /** Returns updates that came in while this segment was merging. */ + public synchronized Map> getMergingUpdates() { + copyUpdatesToMerging(); + if (mergingUpdates.isEmpty()) { + return null; + } + + Map> updates = new HashMap>(); + for (Entry> e : mergingUpdates.entrySet()) { + String field = e.getKey(); + for (Entry fieldUpdates : e.getValue().entrySet()) { + Integer doc = fieldUpdates.getKey(); + Long value = fieldUpdates.getValue(); + Map docUpdates = updates.get(doc); + if (docUpdates == null) { + docUpdates = new HashMap(); + updates.put(doc, docUpdates); + } + docUpdates.put(field, value); + } + } + + mergingUpdates.clear(); + return updates; + } + @Override public String toString() { - return "ReadersAndLiveDocs(seg=" + info + " pendingDeleteCount=" + pendingDeleteCount + " shared=" + shared + ")"; + StringBuilder sb = new StringBuilder(); + sb.append("ReadersAndLiveDocs(seg=").append(info); + sb.append(" pendingDeleteCount=").append(pendingDeleteCount); + sb.append(" liveDocsShared=").append(liveDocsShared); + sb.append(" pendingUpdatesCount=").append(getPendingUpdatesCount()); + return sb.toString(); } + } Index: lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java (revision 1523412) +++ lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java (working copy) @@ -26,17 +26,15 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.codecs.FieldsProducer; import org.apache.lucene.codecs.PostingsFormat; -import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.codecs.StoredFieldsReader; import org.apache.lucene.codecs.TermVectorsReader; -import org.apache.lucene.index.FieldInfo.DocValuesType; import org.apache.lucene.index.SegmentReader.CoreClosedListener; import org.apache.lucene.store.CompoundFileDirectory; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; -import org.apache.lucene.util.Bits; import org.apache.lucene.util.CloseableThreadLocal; import org.apache.lucene.util.IOUtils; @@ -44,18 +42,16 @@ * SegmentReader is cloned or reopened */ final class SegmentCoreReaders { - // Counts how many other reader share the core objects + // Counts how many other readers share the core objects // (freqStream, proxStream, tis, etc.) of this reader; // when coreRef drops to 0, these core objects may be // closed. A given instance of SegmentReader may be - // closed, even those it shares core objects with other + // closed, even though it shares core objects with other // SegmentReaders: private final AtomicInteger ref = new AtomicInteger(1); final FieldInfos fieldInfos; - final FieldsProducer fields; - final DocValuesProducer dvProducer; final DocValuesProducer normsProducer; private final SegmentReader owner; @@ -66,7 +62,7 @@ // TODO: make a single thread local w/ a // Thingy class holding fieldsReader, termVectorsReader, - // normsProducer, dvProducer + // normsProducer final CloseableThreadLocal fieldsReaderLocal = new CloseableThreadLocal() { @Override @@ -82,20 +78,6 @@ } }; - final CloseableThreadLocal> docValuesLocal = new CloseableThreadLocal>() { - @Override - protected Map initialValue() { - return new HashMap(); - } - }; - - final CloseableThreadLocal> docsWithFieldLocal = new CloseableThreadLocal>() { - @Override - protected Map initialValue() { - return new HashMap(); - } - }; - final CloseableThreadLocal> normsLocal = new CloseableThreadLocal>() { @Override protected Map initialValue() { @@ -120,8 +102,9 @@ cfsReader = null; cfsDir = dir; } + fieldInfos = codec.fieldInfosFormat().getFieldInfosReader().read(cfsDir, si.info.name, IOContext.READONCE); - + final PostingsFormat format = codec.postingsFormat(); final SegmentReadState segmentReadState = new SegmentReadState(cfsDir, si.info, fieldInfos, context); // Ask codec for its Fields @@ -131,13 +114,6 @@ // TODO: since we don't write any norms file if there are no norms, // kinda jaky to assume the codec handles the case of no norms file at all gracefully?! - if (fieldInfos.hasDocValues()) { - dvProducer = codec.docValuesFormat().fieldsProducer(segmentReadState); - assert dvProducer != null; - } else { - dvProducer = null; - } - if (fieldInfos.hasNorms()) { normsProducer = codec.normsFormat().normsProducer(segmentReadState); assert normsProducer != null; @@ -167,146 +143,14 @@ this.owner = owner; } + int getRefCount() { + return ref.get(); + } + void incRef() { ref.incrementAndGet(); } - NumericDocValues getNumericDocValues(String field) throws IOException { - FieldInfo fi = fieldInfos.fieldInfo(field); - if (fi == null) { - // Field does not exist - return null; - } - if (fi.getDocValuesType() == null) { - // Field was not indexed with doc values - return null; - } - if (fi.getDocValuesType() != DocValuesType.NUMERIC) { - // DocValues were not numeric - return null; - } - - assert dvProducer != null; - - Map dvFields = docValuesLocal.get(); - - NumericDocValues dvs = (NumericDocValues) dvFields.get(field); - if (dvs == null) { - dvs = dvProducer.getNumeric(fi); - dvFields.put(field, dvs); - } - - return dvs; - } - - BinaryDocValues getBinaryDocValues(String field) throws IOException { - FieldInfo fi = fieldInfos.fieldInfo(field); - if (fi == null) { - // Field does not exist - return null; - } - if (fi.getDocValuesType() == null) { - // Field was not indexed with doc values - return null; - } - if (fi.getDocValuesType() != DocValuesType.BINARY) { - // DocValues were not binary - return null; - } - - assert dvProducer != null; - - Map dvFields = docValuesLocal.get(); - - BinaryDocValues dvs = (BinaryDocValues) dvFields.get(field); - if (dvs == null) { - dvs = dvProducer.getBinary(fi); - dvFields.put(field, dvs); - } - - return dvs; - } - - SortedDocValues getSortedDocValues(String field) throws IOException { - FieldInfo fi = fieldInfos.fieldInfo(field); - if (fi == null) { - // Field does not exist - return null; - } - if (fi.getDocValuesType() == null) { - // Field was not indexed with doc values - return null; - } - if (fi.getDocValuesType() != DocValuesType.SORTED) { - // DocValues were not sorted - return null; - } - - assert dvProducer != null; - - Map dvFields = docValuesLocal.get(); - - SortedDocValues dvs = (SortedDocValues) dvFields.get(field); - if (dvs == null) { - dvs = dvProducer.getSorted(fi); - dvFields.put(field, dvs); - } - - return dvs; - } - - SortedSetDocValues getSortedSetDocValues(String field) throws IOException { - FieldInfo fi = fieldInfos.fieldInfo(field); - if (fi == null) { - // Field does not exist - return null; - } - if (fi.getDocValuesType() == null) { - // Field was not indexed with doc values - return null; - } - if (fi.getDocValuesType() != DocValuesType.SORTED_SET) { - // DocValues were not sorted - return null; - } - - assert dvProducer != null; - - Map dvFields = docValuesLocal.get(); - - SortedSetDocValues dvs = (SortedSetDocValues) dvFields.get(field); - if (dvs == null) { - dvs = dvProducer.getSortedSet(fi); - dvFields.put(field, dvs); - } - - return dvs; - } - - Bits getDocsWithField(String field) throws IOException { - FieldInfo fi = fieldInfos.fieldInfo(field); - if (fi == null) { - // Field does not exist - return null; - } - if (fi.getDocValuesType() == null) { - // Field was not indexed with doc values - return null; - } - - assert dvProducer != null; - - Map dvFields = docsWithFieldLocal.get(); - - Bits dvs = dvFields.get(field); - if (dvs == null) { - dvs = dvProducer.getDocsWithField(fi); - dvFields.put(field, dvs); - } - - return dvs; - } - NumericDocValues getNormValues(String field) throws IOException { FieldInfo fi = fieldInfos.fieldInfo(field); if (fi == null) { @@ -332,8 +176,9 @@ void decRef() throws IOException { if (ref.decrementAndGet() == 0) { - IOUtils.close(termVectorsLocal, fieldsReaderLocal, docValuesLocal, normsLocal, docsWithFieldLocal, fields, - dvProducer, termVectorsReaderOrig, fieldsReaderOrig, cfsReader, normsProducer); +// System.err.println("--- closing core readers"); + IOUtils.close(termVectorsLocal, fieldsReaderLocal, normsLocal, fields, termVectorsReaderOrig, fieldsReaderOrig, + cfsReader, normsProducer); notifyCoreClosedListeners(); } } @@ -356,12 +201,12 @@ /** Returns approximate RAM bytes used */ public long ramBytesUsed() { - return ((dvProducer!=null) ? dvProducer.ramBytesUsed() : 0) + - ((normsProducer!=null) ? normsProducer.ramBytesUsed() : 0) + + return ((normsProducer!=null) ? normsProducer.ramBytesUsed() : 0) + ((fields!=null) ? fields.ramBytesUsed() : 0) + ((fieldsReaderOrig!=null)? fieldsReaderOrig.ramBytesUsed() : 0) + ((termVectorsReaderOrig!=null) ? termVectorsReaderOrig.ramBytesUsed() : 0); } + @Override public String toString() { return "SegmentCoreReader(owner=" + owner + ")"; Index: lucene/core/src/java/org/apache/lucene/index/SegmentInfo.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/SegmentInfo.java (revision 1523412) +++ lucene/core/src/java/org/apache/lucene/index/SegmentInfo.java (working copy) @@ -119,7 +119,7 @@ public void setCodec(Codec codec) { assert this.codec == null; if (codec == null) { - throw new IllegalArgumentException("segmentCodecs must be non-null"); + throw new IllegalArgumentException("codec must be non-null"); } this.codec = codec; } @@ -170,7 +170,6 @@ * left off when there are no deletions).

*/ public String toString(Directory dir, int delCount) { - StringBuilder s = new StringBuilder(); s.append(name).append('(').append(version == null ? "?" : version).append(')').append(':'); char cfs = getUseCompoundFile() ? 'c' : 'C'; Index: lucene/core/src/java/org/apache/lucene/index/SegmentInfoPerCommit.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/SegmentInfoPerCommit.java (revision 1523412) +++ lucene/core/src/java/org/apache/lucene/index/SegmentInfoPerCommit.java (working copy) @@ -19,7 +19,10 @@ import java.io.IOException; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; +import java.util.Set; import org.apache.lucene.store.Directory; @@ -27,9 +30,8 @@ * fields. * * @lucene.experimental */ - -public class SegmentInfoPerCommit { - +public class SegmentInfoPerCommit { // TODO (DVU_RENAME) to SegmentCommitInfo + /** The {@link SegmentInfo} that we wrap. */ public final SegmentInfo info; @@ -44,15 +46,35 @@ // attempt to write: private long nextWriteDelGen; + // holds field.number to docValuesGen mapping + // TODO (DVU_FIELDINFOS_GEN) once we gen FieldInfos, get rid of this; every FieldInfo will record its dvGen + private final Map fieldDocValuesGens = new HashMap(); + + // Generation number of the docValues (-1 if there are no field updates) + private long docValuesGen; + + // Normally 1 + docValuesGen, unless an exception was hit on last attempt to + // write + private long nextWriteDocValuesGen; + + // Tracks the files with field updates + private Set updatesFiles = new HashSet(); + private volatile long sizeInBytes = -1; - /** Sole constructor. - * @param info {@link SegmentInfo} that we wrap - * @param delCount number of deleted documents in this segment - * @param delGen deletion generation number (used to name - deletion files) + /** + * Sole constructor. + * + * @param info + * {@link SegmentInfo} that we wrap + * @param delCount + * number of deleted documents in this segment + * @param delGen + * deletion generation number (used to name deletion files) + * @param docValuesGen + * doc-values generation number (used to name docvalues files) **/ - public SegmentInfoPerCommit(SegmentInfo info, int delCount, long delGen) { + public SegmentInfoPerCommit(SegmentInfo info, int delCount, long delGen, long docValuesGen) { this.info = info; this.delCount = delCount; this.delGen = delGen; @@ -61,8 +83,23 @@ } else { nextWriteDelGen = delGen+1; } + + this.docValuesGen = docValuesGen; + if (docValuesGen == -1) { + nextWriteDocValuesGen = 1; + } else { + nextWriteDocValuesGen = docValuesGen + 1; + } } + public Set getUpdatesFiles() { + return new HashSet(updatesFiles); + } + + public void addUpdatesFiles(Set files) { + updatesFiles.addAll(files); + } + /** Called when we succeed in writing deletes */ void advanceDelGen() { delGen = nextWriteDelGen; @@ -76,6 +113,21 @@ void advanceNextWriteDelGen() { nextWriteDelGen++; } + + /** Called when we succeed in writing docvalues updates */ + void advanceDocValuesGen() { + docValuesGen = nextWriteDocValuesGen; + nextWriteDocValuesGen = docValuesGen + 1; + sizeInBytes = -1; + } + + /** + * Called if there was an exception while writing docvalues updates, so that + * we don't try to write to the same file more than once. + */ + void advanceNextWriteDocValuesGen() { + nextWriteDocValuesGen++; + } /** Returns total size in bytes of all files for this * segment. */ @@ -96,9 +148,15 @@ // Start from the wrapped info's files: Collection files = new HashSet(info.files()); + // TODO we could rely on TrackingDir.getCreatedFiles() (like we do for + // updates) and then maybe even be able to remove LiveDocsFormat.files(). + // Must separately add any live docs files: info.getCodec().liveDocsFormat().files(this, files); + // Must separately add any field updates files + files.addAll(updatesFiles); + return files; } @@ -115,20 +173,6 @@ sizeInBytes = -1; } - void clearDelGen() { - delGen = -1; - sizeInBytes = -1; - } - - /** - * Sets the generation number of the live docs file. - * @see #getDelGen() - */ - public void setDelGen(long delGen) { - this.delGen = delGen; - sizeInBytes = -1; - } - /** Returns true if there are any deletions for the * segment at this commit. */ public boolean hasDeletions() { @@ -135,7 +179,43 @@ return delGen != -1; } + /** Returns true if there are any field updates for the segment in this commit. */ + public boolean hasFieldUpdates() { + return docValuesGen != -1; + } + + /** Returns the next available generation number of the docvalues files. */ + public long getNextDocValuesGen() { + return nextWriteDocValuesGen; + } + /** + * Returns the docvalues generation of this field, or -1 if there are + * no updates to it. + */ + public long getDocValuesGen(int fieldNumber) { + Long gen = fieldDocValuesGens.get(fieldNumber); + return gen == null ? -1 : gen.longValue(); + } + + /** Sets the docvalues generation for this field. */ + public void setDocValuesGen(int fieldNumber, long gen) { + fieldDocValuesGens.put(fieldNumber, gen); + } + + public Map getFieldDocValuesGens() { + return fieldDocValuesGens; + } + + /** + * Returns the generation number of the field infos file or -1 if there are no + * field updates yet. + */ + public long getDocValuesGen() { + return docValuesGen; + } + + /** * Returns the next available generation number * of the live docs file. */ @@ -174,17 +254,25 @@ if (delGen != -1) { s += ":delGen=" + delGen; } + if (docValuesGen != -1) { + s += ":docValuesGen=" + docValuesGen; + } return s; } @Override public SegmentInfoPerCommit clone() { - SegmentInfoPerCommit other = new SegmentInfoPerCommit(info, delCount, delGen); + SegmentInfoPerCommit other = new SegmentInfoPerCommit(info, delCount, delGen, docValuesGen); // Not clear that we need to carry over nextWriteDelGen // (i.e. do we ever clone after a failed write and // before the next successful write?), but just do it to // be safe: other.nextWriteDelGen = nextWriteDelGen; + other.nextWriteDocValuesGen = nextWriteDocValuesGen; + + other.updatesFiles.addAll(updatesFiles); + + other.fieldDocValuesGens.putAll(fieldDocValuesGens); return other; } } Index: lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java (revision 1523412) +++ lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java (working copy) @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.lucene.codecs.Codec; @@ -35,7 +36,7 @@ import org.apache.lucene.codecs.LiveDocsFormat; import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.ChecksumIndexOutput; -import org.apache.lucene.store.DataOutput; // javadocs +import org.apache.lucene.store.DataOutput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; @@ -42,7 +43,6 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.NoSuchDirectoryException; import org.apache.lucene.util.IOUtils; -import org.apache.lucene.util.ThreadInterruptedException; /** * A collection of segmentInfo objects with methods for operating on @@ -111,11 +111,12 @@ */ public final class SegmentInfos implements Cloneable, Iterable { - /** - * The file format version for the segments_N codec header - */ + /** The file format version for the segments_N codec header, up to 4.4. */ public static final int VERSION_40 = 0; + /** The file format version for the segments_N codec header, since 4.5+. */ + public static final int VERSION_45 = 1; + /** Used for the segments.gen file only! * Whenever you add a new format, make it 1 smaller (negative version logic)! */ public static final int FORMAT_SEGMENTS_GEN_CURRENT = -2; @@ -319,7 +320,7 @@ throw new IndexFormatTooOldException(input, magic, CodecUtil.CODEC_MAGIC, CodecUtil.CODEC_MAGIC); } // 4.0+ - CodecUtil.checkHeaderNoMagic(input, "segments", VERSION_40, VERSION_40); + int format = CodecUtil.checkHeaderNoMagic(input, "segments", VERSION_40, VERSION_45); version = input.readLong(); counter = input.readInt(); int numSegments = input.readInt(); @@ -326,7 +327,7 @@ if (numSegments < 0) { throw new CorruptIndexException("invalid segment count: " + numSegments + " (resource: " + input + ")"); } - for(int seg=0;seg= VERSION_45) { + docValuesGen = input.readLong(); + } + SegmentInfoPerCommit siPerCommit = new SegmentInfoPerCommit(info, delCount, delGen, docValuesGen); + if (format >= VERSION_45) { + int numUpdates = input.readInt(); + for (int i = 0; i < numUpdates; i++) { + siPerCommit.setDocValuesGen(input.readInt(), input.readLong()); + } + siPerCommit.addUpdatesFiles(input.readStringSet()); + } + add(siPerCommit); } userData = input.readStringStringMap(); @@ -395,7 +408,7 @@ try { segnOutput = new ChecksumIndexOutput(directory.createOutput(segmentFileName, IOContext.DEFAULT)); - CodecUtil.writeHeader(segnOutput, "segments", VERSION_40); + CodecUtil.writeHeader(segnOutput, "segments", VERSION_45); segnOutput.writeLong(version); segnOutput.writeInt(counter); // write counter segnOutput.writeInt(size()); // write infos @@ -405,6 +418,14 @@ segnOutput.writeString(si.getCodec().getName()); segnOutput.writeLong(siPerCommit.getDelGen()); segnOutput.writeInt(siPerCommit.getDelCount()); + segnOutput.writeLong(siPerCommit.getDocValuesGen()); + Map docValuesUpdatesGen = siPerCommit.getFieldDocValuesGens(); + segnOutput.writeInt(docValuesUpdatesGen.size()); + for (Entry e : docValuesUpdatesGen.entrySet()) { + segnOutput.writeInt(e.getKey()); + segnOutput.writeLong(e.getValue()); + } + segnOutput.writeStringSet(siPerCommit.getUpdatesFiles()); assert si.dir == directory; assert siPerCommit.getDelCount() <= si.getDocCount(); @@ -815,6 +836,7 @@ files.addAll(info.files()); } } + return files; } Index: lucene/core/src/java/org/apache/lucene/index/SegmentReader.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/SegmentReader.java (revision 1523412) +++ lucene/core/src/java/org/apache/lucene/index/SegmentReader.java (working copy) @@ -18,13 +18,24 @@ */ import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; -import org.apache.lucene.store.Directory; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.DocValuesFormat; +import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.codecs.StoredFieldsReader; import org.apache.lucene.codecs.TermVectorsReader; -import org.apache.lucene.search.FieldCache; // javadocs +import org.apache.lucene.index.FieldInfo.DocValuesType; +import org.apache.lucene.search.FieldCache; +import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Bits; +import org.apache.lucene.util.CloseableThreadLocal; +import org.apache.lucene.util.RefCount; /** * IndexReader implementation over a single segment. @@ -45,6 +56,23 @@ final SegmentCoreReaders core; + final CloseableThreadLocal> docValuesLocal = new CloseableThreadLocal>() { + @Override + protected Map initialValue() { + return new HashMap(); + } + }; + + final CloseableThreadLocal> docsWithFieldLocal = new CloseableThreadLocal>() { + @Override + protected Map initialValue() { + return new HashMap(); + } + }; + + final Map dvProducers = new HashMap(); + final Map> genDVProducers = new HashMap>(); + /** * Constructs a new SegmentReader with a new core. * @throws CorruptIndexException if the index is corrupt @@ -54,16 +82,41 @@ public SegmentReader(SegmentInfoPerCommit si, IOContext context) throws IOException { this.si = si; core = new SegmentCoreReaders(this, si.info.dir, si, context); + boolean success = false; + final Codec codec = si.info.getCodec(); try { if (si.hasDeletions()) { // NOTE: the bitvector is stored using the regular directory, not cfs - liveDocs = si.info.getCodec().liveDocsFormat().readLiveDocs(directory(), si, IOContext.READONCE); + liveDocs = codec.liveDocsFormat().readLiveDocs(directory(), si, IOContext.READONCE); } else { assert si.getDelCount() == 0; liveDocs = null; } numDocs = si.info.getDocCount() - si.getDelCount(); + + if (core.fieldInfos.hasDocValues()) { + final Directory dir = core.cfsReader != null ? core.cfsReader : si.info.dir; + final DocValuesFormat dvFormat = codec.docValuesFormat(); + // initialize the per generation numericDVProducers and put the correct + // DVProducer for each field + final Map> genInfos = getGenInfos(si); + + for (Entry> e : genInfos.entrySet()) { + Long gen = e.getKey(); + List infos = e.getValue(); + RefCount dvp = genDVProducers.get(gen); + if (dvp == null) { + dvp = newDocValuesProducer(si, context, dir, dvFormat, gen, infos); + assert dvp != null; + genDVProducers.put(gen, dvp); + } + for (FieldInfo fi : infos) { + dvProducers.put(fi.name, dvp.get()); + } + } + } + success = true; } finally { // With lock-less commits, it's entirely possible (and @@ -72,7 +125,7 @@ // of things that were opened so that we don't have to // wait for a GC to do so. if (!success) { - core.decRef(); + doClose(); } } } @@ -80,8 +133,8 @@ /** Create new SegmentReader sharing core from a previous * SegmentReader and loading new live docs from a new * deletes file. Used by openIfChanged. */ - SegmentReader(SegmentInfoPerCommit si, SegmentCoreReaders core) throws IOException { - this(si, core, + SegmentReader(SegmentInfoPerCommit si, SegmentReader sr) throws IOException { + this(si, sr, si.info.getCodec().liveDocsFormat().readLiveDocs(si.info.dir, si, IOContext.READONCE), si.info.getDocCount() - si.getDelCount()); } @@ -90,15 +143,88 @@ * SegmentReader and using the provided in-memory * liveDocs. Used by IndexWriter to provide a new NRT * reader */ - SegmentReader(SegmentInfoPerCommit si, SegmentCoreReaders core, Bits liveDocs, int numDocs) { + SegmentReader(SegmentInfoPerCommit si, SegmentReader sr, Bits liveDocs, int numDocs) throws IOException { this.si = si; - this.core = core; + this.liveDocs = liveDocs; + this.numDocs = numDocs; + this.core = sr.core; core.incRef(); + + // increment refCount of DocValuesProducers that are used by this reader + boolean success = false; + try { + if (core.fieldInfos.hasDocValues()) { + final Codec codec = si.info.getCodec(); + final Directory dir = core.cfsReader != null ? core.cfsReader : si.info.dir; + + final DocValuesFormat dvFormat = codec.docValuesFormat(); + final Map> genInfos = getGenInfos(si); + + for (Entry> e : genInfos.entrySet()) { + Long gen = e.getKey(); + List infos = e.getValue(); + RefCount dvp = genDVProducers.get(gen); + if (dvp == null) { + // check if this DVP gen is used by the given reader + dvp = sr.genDVProducers.get(gen); + if (dvp != null) { + // gen used by given reader, incRef its DVP + dvp.incRef(); + } else { + // this gen is not used by given reader, initialize a new one + dvp = newDocValuesProducer(si, IOContext.READ, dir, dvFormat, gen, infos); + } + assert dvp != null; + genDVProducers.put(gen, dvp); + } + for (FieldInfo fi : infos) { + dvProducers.put(fi.name, dvp.get()); + } + } + } + success = true; + } finally { + if (!success) { + doClose(); + } + } + } - assert liveDocs != null; - this.liveDocs = liveDocs; + // returns a gen->List mapping. Fields without DV updates have gen=-1 + private Map> getGenInfos(SegmentInfoPerCommit si) { + final Map> genInfos = new HashMap>(); + for (FieldInfo fi : core.fieldInfos) { + if (fi.getDocValuesType() == null) { + continue; + } + long gen = si.getDocValuesGen(fi.number); + List infos = genInfos.get(gen); + if (infos == null) { + infos = new ArrayList(); + genInfos.put(gen, infos); + } + infos.add(fi); + } + return genInfos; + } - this.numDocs = numDocs; + private RefCount newDocValuesProducer(SegmentInfoPerCommit si, IOContext context, Directory dir, + DocValuesFormat dvFormat, Long gen, List infos) throws IOException { + Directory dvDir = dir; + String segmentSuffix = ""; + if (gen.longValue() != -1) { + dvDir = si.info.dir; // gen'd files are written outside CFS, so use SegInfo directory + segmentSuffix = Long.toString(gen.longValue(), Character.MAX_RADIX); + } + + // set SegmentReadState to list only the fields that are relevant to that gen + SegmentReadState srs = new SegmentReadState(dvDir, si.info, new FieldInfos(infos.toArray(new FieldInfo[infos.size()])), context, segmentSuffix); + return new RefCount(dvFormat.fieldsProducer(srs)) { + @Override + protected void release() throws IOException { + object.close(); + } + }; } @Override @@ -110,7 +236,26 @@ @Override protected void doClose() throws IOException { //System.out.println("SR.close seg=" + si); - core.decRef(); + try { + core.decRef(); + } finally { + Throwable t = null; + for (RefCount dvp : genDVProducers.values()) { + try { + dvp.decRef(); + } catch (Throwable th) { + if (t != null) { + t = th; + } + } + } + if (t != null) { + if (t instanceof IOException) throw (IOException) t; + if (t instanceof RuntimeException) throw (RuntimeException) t; + if (t instanceof Error) throw (Error) t; + throw new RuntimeException(t); + } + } } @Override @@ -217,34 +362,140 @@ return this; } + // returns the FieldInfo that corresponds to the given field and type, or + // null if the field does not exist, or not indexed as the requested + // DovDocValuesType. + private FieldInfo getDVField(String field, DocValuesType type) { + FieldInfo fi = core.fieldInfos.fieldInfo(field); + if (fi == null) { + // Field does not exist + return null; + } + if (fi.getDocValuesType() == null) { + // Field was not indexed with doc values + return null; + } + if (fi.getDocValuesType() != type) { + // Field DocValues are different than requested type + return null; + } + + return fi; + } + @Override public NumericDocValues getNumericDocValues(String field) throws IOException { ensureOpen(); - return core.getNumericDocValues(field); + FieldInfo fi = getDVField(field, DocValuesType.NUMERIC); + if (fi == null) { + return null; + } + + DocValuesProducer dvProducer = dvProducers.get(field); + assert dvProducer != null; + + Map dvFields = docValuesLocal.get(); + + NumericDocValues dvs = (NumericDocValues) dvFields.get(field); + if (dvs == null) { + dvs = dvProducer.getNumeric(fi); + dvFields.put(field, dvs); + } + + return dvs; } @Override public Bits getDocsWithField(String field) throws IOException { ensureOpen(); - return core.getDocsWithField(field); + FieldInfo fi = core.fieldInfos.fieldInfo(field); + if (fi == null) { + // Field does not exist + return null; + } + if (fi.getDocValuesType() == null) { + // Field was not indexed with doc values + return null; + } + + DocValuesProducer dvProducer = dvProducers.get(field); + assert dvProducer != null; + + Map dvFields = docsWithFieldLocal.get(); + + Bits dvs = dvFields.get(field); + if (dvs == null) { + dvs = dvProducer.getDocsWithField(fi); + dvFields.put(field, dvs); + } + + return dvs; } @Override public BinaryDocValues getBinaryDocValues(String field) throws IOException { ensureOpen(); - return core.getBinaryDocValues(field); + FieldInfo fi = getDVField(field, DocValuesType.BINARY); + if (fi == null) { + return null; + } + + DocValuesProducer dvProducer = dvProducers.get(field); + assert dvProducer != null; + + Map dvFields = docValuesLocal.get(); + + BinaryDocValues dvs = (BinaryDocValues) dvFields.get(field); + if (dvs == null) { + dvs = dvProducer.getBinary(fi); + dvFields.put(field, dvs); + } + + return dvs; } @Override public SortedDocValues getSortedDocValues(String field) throws IOException { ensureOpen(); - return core.getSortedDocValues(field); + FieldInfo fi = getDVField(field, DocValuesType.SORTED); + if (fi == null) { + return null; + } + + DocValuesProducer dvProducer = dvProducers.get(field); + assert dvProducer != null; + + Map dvFields = docValuesLocal.get(); + + SortedDocValues dvs = (SortedDocValues) dvFields.get(field); + if (dvs == null) { + dvs = dvProducer.getSorted(fi); + dvFields.put(field, dvs); + } + + return dvs; } @Override public SortedSetDocValues getSortedSetDocValues(String field) throws IOException { ensureOpen(); - return core.getSortedSetDocValues(field); + FieldInfo fi = getDVField(field, DocValuesType.SORTED_SET); + if (fi == null) { + return null; + } + + DocValuesProducer dvProducer = dvProducers.get(field); + assert dvProducer != null; + + Map dvFields = docValuesLocal.get(); + + SortedSetDocValues dvs = (SortedSetDocValues) dvFields.get(field); + if (dvs == null) { + dvs = dvProducer.getSortedSet(fi); + dvFields.put(field, dvs); + } + + return dvs; } @Override @@ -284,9 +535,21 @@ core.removeCoreClosedListener(listener); } + private long dvRamBytesUsed() { + long ramBytesUsed = 0; + for (RefCount dvp : genDVProducers.values()) { + ramBytesUsed += dvp.get().ramBytesUsed(); + } + return ramBytesUsed; + } + /** Returns approximate RAM Bytes used */ public long ramBytesUsed() { ensureOpen(); - return (core!=null) ? core.ramBytesUsed() : 0; + long ramBytesUsed = dvRamBytesUsed(); + if (core != null) { + ramBytesUsed += core.ramBytesUsed(); + } + return ramBytesUsed; } } Index: lucene/core/src/java/org/apache/lucene/index/SegmentWriteState.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/SegmentWriteState.java (revision 1523412) +++ lucene/core/src/java/org/apache/lucene/index/SegmentWriteState.java (working copy) @@ -71,16 +71,25 @@ * to {@link Directory#createOutput(String,IOContext)}. */ public final IOContext context; + /** True is this instance represents a field update. */ + public final boolean isFieldUpdate; // TODO (DVU_FIELDINFOS_GEN) once we gen FieldInfos, get rid of this + /** Sole constructor. */ public SegmentWriteState(InfoStream infoStream, Directory directory, SegmentInfo segmentInfo, FieldInfos fieldInfos, BufferedDeletes segDeletes, IOContext context) { + this(infoStream, directory, segmentInfo, fieldInfos, segDeletes, context, "", false); + } + + public SegmentWriteState(InfoStream infoStream, Directory directory, SegmentInfo segmentInfo, FieldInfos fieldInfos, + BufferedDeletes segDeletes, IOContext context, String segmentSuffix, boolean isFieldUpdate) { this.infoStream = infoStream; this.segDeletes = segDeletes; this.directory = directory; this.segmentInfo = segmentInfo; this.fieldInfos = fieldInfos; - segmentSuffix = ""; + this.segmentSuffix = segmentSuffix; this.context = context; + this.isFieldUpdate = isFieldUpdate; } /** Create a shallow copy of {@link SegmentWriteState} with a new segment suffix. */ @@ -93,5 +102,6 @@ this.segmentSuffix = segmentSuffix; segDeletes = state.segDeletes; delCountOnFlush = state.delCountOnFlush; + isFieldUpdate = state.isFieldUpdate; } } Index: lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java (revision 1523412) +++ lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java (working copy) @@ -162,7 +162,8 @@ readerShared[i] = false; newReaders[i] = newReader; } else { - if (newReaders[i].getSegmentInfo().getDelGen() == infos.info(i).getDelGen()) { + if (newReaders[i].getSegmentInfo().getDelGen() == infos.info(i).getDelGen() + && newReaders[i].getSegmentInfo().getDocValuesGen() == infos.info(i).getDocValuesGen()) { // No change; this reader will be shared between // the old and the new one, so we must incRef // it: @@ -169,11 +170,18 @@ readerShared[i] = true; newReaders[i].incRef(); } else { + // there are changes to the reader, either liveDocs or DV updates readerShared[i] = false; // Steal the ref returned by SegmentReader ctor: assert infos.info(i).info.dir == newReaders[i].getSegmentInfo().info.dir; - assert infos.info(i).hasDeletions(); - newReaders[i] = new SegmentReader(infos.info(i), newReaders[i].core); + assert infos.info(i).hasDeletions() || infos.info(i).hasFieldUpdates(); + if (newReaders[i].getSegmentInfo().getDelGen() == infos.info(i).getDelGen()) { + // only DV updates + newReaders[i] = new SegmentReader(infos.info(i), newReaders[i], newReaders[i].getLiveDocs(), newReaders[i].numDocs()); + } else { + // both DV and liveDocs have changed + newReaders[i] = new SegmentReader(infos.info(i), newReaders[i]); + } } } success = true; Index: lucene/core/src/java/org/apache/lucene/util/RefCount.java =================================================================== --- lucene/core/src/java/org/apache/lucene/util/RefCount.java (revision 0) +++ lucene/core/src/java/org/apache/lucene/util/RefCount.java (working copy) @@ -0,0 +1,84 @@ +package org.apache.lucene.util; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Manages reference counting for a given object. Extensions can override + * {@link #release()} to do custom logic when reference counting hits 0. + */ +public class RefCount { + + private final AtomicInteger refCount = new AtomicInteger(1); + + protected final T object; + + public RefCount(T object) { + this.object = object; + } + + /** + * Called when reference counting hits 0. By default this method does nothing, + * but extensions can override to e.g. release resources attached to object + * that is managed by this class. + */ + protected void release() throws IOException {} + + /** + * Decrements the reference counting of this object. When reference counting + * hits 0, calls {@link #release()}. + */ + public final void decRef() throws IOException { + final int rc = refCount.decrementAndGet(); + if (rc == 0) { + boolean success = false; + try { + release(); + success = true; + } finally { + if (!success) { + // Put reference back on failure + refCount.incrementAndGet(); + } + } + } else if (rc < 0) { + throw new IllegalStateException("too many decRef calls: refCount is " + rc + " after decrement"); + } + } + + public final T get() { + return object; + } + + /** Returns the current reference count. */ + public final int getRefCount() { + return refCount.get(); + } + + /** + * Increments the reference count. Calls to this method must be matched with + * calls to {@link #decRef()}. + */ + public final void incRef() { + refCount.incrementAndGet(); + } + +} + Index: lucene/core/src/test/org/apache/lucene/index/TestDoc.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestDoc.java (revision 1523412) +++ lucene/core/src/test/org/apache/lucene/index/TestDoc.java (working copy) @@ -239,7 +239,7 @@ } } - return new SegmentInfoPerCommit(info, 0, -1L); + return new SegmentInfoPerCommit(info, 0, -1L, -1L); } Index: lucene/core/src/test/org/apache/lucene/index/TestNumericDocValuesUpdates.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestNumericDocValuesUpdates.java (revision 0) +++ lucene/core/src/test/org/apache/lucene/index/TestNumericDocValuesUpdates.java (working copy) @@ -0,0 +1,1046 @@ +package org.apache.lucene.index; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.CountDownLatch; + +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.DocValuesFormat; +import org.apache.lucene.codecs.asserting.AssertingDocValuesFormat; +import org.apache.lucene.codecs.lucene40.Lucene40RWCodec; +import org.apache.lucene.codecs.lucene41.Lucene41RWCodec; +import org.apache.lucene.codecs.lucene42.Lucene42RWCodec; +import org.apache.lucene.codecs.lucene45.Lucene45Codec; +import org.apache.lucene.codecs.lucene45.Lucene45DocValuesFormat; +import org.apache.lucene.document.BinaryDocValuesField; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.SortedDocValuesField; +import org.apache.lucene.document.SortedSetDocValuesField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.LuceneTestCase.SuppressCodecs; +import org.junit.Test; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@SuppressCodecs({"Lucene40","Lucene41","Lucene42"}) +public class TestNumericDocValuesUpdates extends LuceneTestCase { + + private Document doc(int id) { + Document doc = new Document(); + doc.add(new StringField("id", "doc-" + id, Store.NO)); + // make sure we don't set the doc's value to 0, to not confuse with a document that's missing values + doc.add(new NumericDocValuesField("val", id + 1)); + return doc; + } + + @Test + public void testSimple() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + // make sure random config doesn't flush on us + conf.setMaxBufferedDocs(10); + conf.setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH); + IndexWriter writer = new IndexWriter(dir, conf); + writer.addDocument(doc(0)); // val=1 + writer.addDocument(doc(1)); // val=2 + if (random().nextBoolean()) { // randomly commit before the update is sent + writer.commit(); + } + writer.updateNumericDocValue(new Term("id", "doc-0"), "val", 2L); // doc=0, exp=2 + + final DirectoryReader reader; + if (random().nextBoolean()) { // not NRT + writer.close(); + reader = DirectoryReader.open(dir); + } else { // NRT + reader = DirectoryReader.open(writer, true); + writer.close(); + } + + assertEquals(1, reader.leaves().size()); + AtomicReader r = reader.leaves().get(0).reader(); + NumericDocValues ndv = r.getNumericDocValues("val"); + assertEquals(2, ndv.get(0)); + assertEquals(2, ndv.get(1)); + reader.close(); + + dir.close(); + } + + @Test + public void testUpdateFewSegments() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(2); // generate few segments + conf.setMergePolicy(NoMergePolicy.COMPOUND_FILES); // prevent merges for this test + IndexWriter writer = new IndexWriter(dir, conf); + int numDocs = 10; + long[] expectedValues = new long[numDocs]; + for (int i = 0; i < numDocs; i++) { + writer.addDocument(doc(i)); + expectedValues[i] = i + 1; + } + writer.commit(); + + // update few docs + for (int i = 0; i < numDocs; i++) { + if (random().nextDouble() < 0.4) { + long value = (i + 1) * 2; + writer.updateNumericDocValue(new Term("id", "doc-" + i), "val", value); + expectedValues[i] = value; + } + } + + final DirectoryReader reader; + if (random().nextBoolean()) { // not NRT + writer.close(); + reader = DirectoryReader.open(dir); + } else { // NRT + reader = DirectoryReader.open(writer, true); + writer.close(); + } + + for (AtomicReaderContext context : reader.leaves()) { + AtomicReader r = context.reader(); + NumericDocValues ndv = r.getNumericDocValues("val"); + assertNotNull(ndv); + for (int i = 0; i < r.maxDoc(); i++) { + long expected = expectedValues[i + context.docBase]; + long actual = ndv.get(i); + assertEquals(expected, actual); + } + } + + reader.close(); + dir.close(); + } + + @Test + public void testReopen() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + writer.addDocument(doc(0)); + writer.addDocument(doc(1)); + + final boolean isNRT = random().nextBoolean(); + final DirectoryReader reader1; + if (isNRT) { + reader1 = DirectoryReader.open(writer, true); + } else { + writer.commit(); + reader1 = DirectoryReader.open(dir); + } + + // update doc + writer.updateNumericDocValue(new Term("id", "doc-0"), "val", 10L); // update doc-0's value to 10 + if (!isNRT) { + writer.commit(); + } + + // reopen reader and assert only it sees the update + final DirectoryReader reader2 = DirectoryReader.openIfChanged(reader1); + assertNotNull(reader2); + assertTrue(reader1 != reader2); + + assertEquals(1, reader1.leaves().get(0).reader().getNumericDocValues("val").get(0)); + assertEquals(10, reader2.leaves().get(0).reader().getNumericDocValues("val").get(0)); + + IOUtils.close(writer, reader1, reader2, dir); + } + + @Test + public void testUpdatesAndDeletes() throws Exception { + // create an index with a segment with only deletes, a segment with both + // deletes and updates and a segment with only updates + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(10); // control segment flushing + conf.setMergePolicy(NoMergePolicy.COMPOUND_FILES); // prevent merges for this test + IndexWriter writer = new IndexWriter(dir, conf); + + for (int i = 0; i < 6; i++) { + writer.addDocument(doc(i)); + if (i % 2 == 1) { + writer.commit(); // create 2-docs segments + } + } + + // delete doc-1 and doc-2 + writer.deleteDocuments(new Term("id", "doc-1"), new Term("id", "doc-2")); // 1st and 2nd segments + + // update docs 3 and 5 + writer.updateNumericDocValue(new Term("id", "doc-3"), "val", 17L); + writer.updateNumericDocValue(new Term("id", "doc-5"), "val", 17L); + + final DirectoryReader reader; + if (random().nextBoolean()) { // not NRT + writer.close(); + reader = DirectoryReader.open(dir); + } else { // NRT + reader = DirectoryReader.open(writer, true); + writer.close(); + } + + AtomicReader slow = SlowCompositeReaderWrapper.wrap(reader); + + Bits liveDocs = slow.getLiveDocs(); + boolean[] expectedLiveDocs = new boolean[] { true, false, false, true, true, true }; + for (int i = 0; i < expectedLiveDocs.length; i++) { + assertEquals(expectedLiveDocs[i], liveDocs.get(i)); + } + + long[] expectedValues = new long[] { 1, 2, 3, 17, 5, 17}; + NumericDocValues ndv = slow.getNumericDocValues("val"); + for (int i = 0; i < expectedValues.length; i++) { + assertEquals(expectedValues[i], ndv.get(i)); + } + + reader.close(); + dir.close(); + } + + @Test + public void testUpdatesWithDeletes() throws Exception { + // update and delete different documents in the same commit session + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(10); // control segment flushing + IndexWriter writer = new IndexWriter(dir, conf); + + writer.addDocument(doc(0)); + writer.addDocument(doc(1)); + + if (random().nextBoolean()) { + writer.commit(); + } + + writer.deleteDocuments(new Term("id", "doc-0")); + writer.updateNumericDocValue(new Term("id", "doc-1"), "val", 17L); + + final DirectoryReader reader; + if (random().nextBoolean()) { // not NRT + writer.close(); + reader = DirectoryReader.open(dir); + } else { // NRT + reader = DirectoryReader.open(writer, true); + writer.close(); + } + + AtomicReader r = reader.leaves().get(0).reader(); + assertFalse(r.getLiveDocs().get(0)); + assertEquals(17, r.getNumericDocValues("val").get(1)); + + reader.close(); + dir.close(); + } + + @Test + public void testUpdateAndDeleteSameDocument() throws Exception { + // update and delete same document in same commit session + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(10); // control segment flushing + IndexWriter writer = new IndexWriter(dir, conf); + + writer.addDocument(doc(0)); + writer.addDocument(doc(1)); + + if (random().nextBoolean()) { + writer.commit(); + } + + writer.deleteDocuments(new Term("id", "doc-0")); + writer.updateNumericDocValue(new Term("id", "doc-0"), "val", 17L); + + final DirectoryReader reader; + if (random().nextBoolean()) { // not NRT + writer.close(); + reader = DirectoryReader.open(dir); + } else { // NRT + reader = DirectoryReader.open(writer, true); + writer.close(); + } + + AtomicReader r = reader.leaves().get(0).reader(); + assertFalse(r.getLiveDocs().get(0)); + assertEquals(1, r.getNumericDocValues("val").get(0)); // deletes are currently applied first + + reader.close(); + dir.close(); + } + + @Test + public void testMultipleDocValuesTypes() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(10); // prevent merges + IndexWriter writer = new IndexWriter(dir, conf); + + for (int i = 0; i < 4; i++) { + Document doc = new Document(); + doc.add(new StringField("dvUpdateKey", "dv", Store.NO)); + doc.add(new NumericDocValuesField("ndv", i)); + doc.add(new BinaryDocValuesField("bdv", new BytesRef(Integer.toString(i)))); + doc.add(new SortedDocValuesField("sdv", new BytesRef(Integer.toString(i)))); + doc.add(new SortedSetDocValuesField("ssdv", new BytesRef(Integer.toString(i)))); + doc.add(new SortedSetDocValuesField("ssdv", new BytesRef(Integer.toString(i * 2)))); + writer.addDocument(doc); + } + writer.commit(); + + // update all docs' ndv field + writer.updateNumericDocValue(new Term("dvUpdateKey", "dv"), "ndv", 17L); + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + AtomicReader r = reader.leaves().get(0).reader(); + NumericDocValues ndv = r.getNumericDocValues("ndv"); + BinaryDocValues bdv = r.getBinaryDocValues("bdv"); + SortedDocValues sdv = r.getSortedDocValues("sdv"); + SortedSetDocValues ssdv = r.getSortedSetDocValues("ssdv"); + BytesRef scratch = new BytesRef(); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(17, ndv.get(0)); + bdv.get(i, scratch); + assertEquals(new BytesRef(Integer.toString(i)), scratch); + sdv.get(i, scratch); + assertEquals(new BytesRef(Integer.toString(i)), scratch); + ssdv.setDocument(i); + long ord = ssdv.nextOrd(); + ssdv.lookupOrd(ord, scratch); + assertEquals(i, Integer.parseInt(scratch.utf8ToString())); + if (i != 0) { + ord = ssdv.nextOrd(); + ssdv.lookupOrd(ord, scratch); + assertEquals(i * 2, Integer.parseInt(scratch.utf8ToString())); + } + assertEquals(SortedSetDocValues.NO_MORE_ORDS, ssdv.nextOrd()); + } + + reader.close(); + dir.close(); + } + + @Test + public void testMultipleNumericDocValues() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(10); // prevent merges + IndexWriter writer = new IndexWriter(dir, conf); + + for (int i = 0; i < 2; i++) { + Document doc = new Document(); + doc.add(new StringField("dvUpdateKey", "dv", Store.NO)); + doc.add(new NumericDocValuesField("ndv1", i)); + doc.add(new NumericDocValuesField("ndv2", i)); + writer.addDocument(doc); + } + writer.commit(); + + // update all docs' ndv1 field + writer.updateNumericDocValue(new Term("dvUpdateKey", "dv"), "ndv1", 17L); + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + AtomicReader r = reader.leaves().get(0).reader(); + NumericDocValues ndv1 = r.getNumericDocValues("ndv1"); + NumericDocValues ndv2 = r.getNumericDocValues("ndv2"); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(17, ndv1.get(i)); + assertEquals(i, ndv2.get(i)); + } + + reader.close(); + dir.close(); + } + + @Test + public void testDocumentWithNoValue() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + for (int i = 0; i < 2; i++) { + Document doc = new Document(); + doc.add(new StringField("dvUpdateKey", "dv", Store.NO)); + if (i == 0) { // index only one document with value + doc.add(new NumericDocValuesField("ndv", 5)); + } + writer.addDocument(doc); + } + writer.commit(); + + // update all docs' ndv field + writer.updateNumericDocValue(new Term("dvUpdateKey", "dv"), "ndv", 17L); + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + AtomicReader r = reader.leaves().get(0).reader(); + NumericDocValues ndv = r.getNumericDocValues("ndv"); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(17, ndv.get(i)); + } + + reader.close(); + dir.close(); + } + + @Test + public void testUnsetValue() throws Exception { + assumeTrue("codec does not support docsWithField", defaultCodecSupportsDocsWithField()); + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + for (int i = 0; i < 2; i++) { + Document doc = new Document(); + doc.add(new StringField("id", "doc" + i, Store.NO)); + doc.add(new NumericDocValuesField("ndv", 5)); + writer.addDocument(doc); + } + writer.commit(); + + // unset the value of 'doc0' + writer.updateNumericDocValue(new Term("id", "doc0"), "ndv", null); + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + AtomicReader r = reader.leaves().get(0).reader(); + NumericDocValues ndv = r.getNumericDocValues("ndv"); + for (int i = 0; i < r.maxDoc(); i++) { + if (i == 0) { + assertEquals(0, ndv.get(i)); + } else { + assertEquals(5, ndv.get(i)); + } + } + + Bits docsWithField = r.getDocsWithField("ndv"); + assertFalse(docsWithField.get(0)); + assertTrue(docsWithField.get(1)); + + reader.close(); + dir.close(); + } + + @Test + public void testUpdateNonDocValueField() throws Exception { + // we don't support adding new fields or updating existing non-numeric-dv + // fields through numeric updates + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + Document doc = new Document(); + doc.add(new StringField("key", "doc", Store.NO)); + doc.add(new StringField("foo", "bar", Store.NO)); + writer.addDocument(doc); // flushed document + writer.commit(); + writer.addDocument(doc); // in-memory document + + try { + writer.updateNumericDocValue(new Term("key", "doc"), "ndv", 17L); + fail("should not have allowed creating new fields through update"); + } catch (IllegalArgumentException e) { + // ok + } + + try { + writer.updateNumericDocValue(new Term("key", "doc"), "foo", 17L); + fail("should not have allowed updating an existing field to numeric-dv"); + } catch (IllegalArgumentException e) { + // ok + } + + writer.close(); + dir.close(); + } + + @Test + public void testDifferentDVFormatPerField() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setCodec(new Lucene45Codec() { + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return new Lucene45DocValuesFormat(); + } + }); + IndexWriter writer = new IndexWriter(dir, conf); + + Document doc = new Document(); + doc.add(new StringField("key", "doc", Store.NO)); + doc.add(new NumericDocValuesField("ndv", 5)); + doc.add(new SortedDocValuesField("sorted", new BytesRef("value"))); + writer.addDocument(doc); // flushed document + writer.commit(); + writer.addDocument(doc); // in-memory document + + writer.updateNumericDocValue(new Term("key", "doc"), "ndv", 17L); + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + + AtomicReader r = SlowCompositeReaderWrapper.wrap(reader); + NumericDocValues ndv = r.getNumericDocValues("ndv"); + SortedDocValues sdv = r.getSortedDocValues("sorted"); + BytesRef scratch = new BytesRef(); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(17, ndv.get(i)); + sdv.get(i, scratch); + assertEquals(new BytesRef("value"), scratch); + } + + reader.close(); + dir.close(); + } + + @Test + public void testUpdateSameDocMultipleTimes() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + Document doc = new Document(); + doc.add(new StringField("key", "doc", Store.NO)); + doc.add(new NumericDocValuesField("ndv", 5)); + writer.addDocument(doc); // flushed document + writer.commit(); + writer.addDocument(doc); // in-memory document + + writer.updateNumericDocValue(new Term("key", "doc"), "ndv", 17L); // update existing field + writer.updateNumericDocValue(new Term("key", "doc"), "ndv", 3L); // update existing field 2nd time in this commit + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + final AtomicReader r = SlowCompositeReaderWrapper.wrap(reader); + NumericDocValues ndv = r.getNumericDocValues("ndv"); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(3, ndv.get(i)); + } + reader.close(); + dir.close(); + } + + @Test + public void testSegmentMerges() throws Exception { + Directory dir = newDirectory(); + Random random = random(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)); + IndexWriter writer = new IndexWriter(dir, conf.clone()); + + int docid = 0; + int numRounds = atLeast(10); + for (int rnd = 0; rnd < numRounds; rnd++) { + Document doc = new Document(); + doc.add(new StringField("key", "doc", Store.NO)); + doc.add(new NumericDocValuesField("ndv", -1)); + int numDocs = atLeast(30); + for (int i = 0; i < numDocs; i++) { + doc.removeField("id"); + doc.add(new StringField("id", Integer.toString(docid++), Store.NO)); + writer.addDocument(doc); + } + + long value = rnd + 1; + writer.updateNumericDocValue(new Term("key", "doc"), "ndv", value); + + if (random.nextDouble() < 0.2) { // randomly delete some docs + writer.deleteDocuments(new Term("id", Integer.toString(random.nextInt(docid)))); + } + + // randomly commit or reopen-IW (or nothing), before forceMerge + if (random.nextDouble() < 0.4) { + writer.commit(); + } else if (random.nextDouble() < 0.1) { + writer.close(); + writer = new IndexWriter(dir, conf.clone()); + } + + // add another document with the current value, to be sure forceMerge has + // something to merge (for instance, it could be that CMS finished merging + // all segments down to 1 before the delete was applied, so when + // forceMerge is called, the index will be with one segment and deletes + // and some MPs might now merge it, thereby invalidating test's + // assumption that the reader has no deletes). + doc = new Document(); + doc.add(new StringField("id", Integer.toString(docid++), Store.NO)); + doc.add(new StringField("key", "doc", Store.NO)); + doc.add(new NumericDocValuesField("ndv", value)); + writer.addDocument(doc); + + writer.forceMerge(1, true); + final DirectoryReader reader; + if (random.nextBoolean()) { + writer.commit(); + reader = DirectoryReader.open(dir); + } else { + reader = DirectoryReader.open(writer, true); + } + + assertEquals(1, reader.leaves().size()); + final AtomicReader r = reader.leaves().get(0).reader(); + assertNull("index should have no deletes after forceMerge", r.getLiveDocs()); + NumericDocValues ndv = r.getNumericDocValues("ndv"); + assertNotNull(ndv); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(value, ndv.get(i)); + } + reader.close(); + } + + writer.close(); + dir.close(); + } + + @Test + public void testUpdateDocumentByMultipleTerms() throws Exception { + // make sure the order of updates is respected, even when multiple terms affect same document + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + Document doc = new Document(); + doc.add(new StringField("k1", "v1", Store.NO)); + doc.add(new StringField("k2", "v2", Store.NO)); + doc.add(new NumericDocValuesField("ndv", 5)); + writer.addDocument(doc); // flushed document + writer.commit(); + writer.addDocument(doc); // in-memory document + + writer.updateNumericDocValue(new Term("k1", "v1"), "ndv", 17L); + writer.updateNumericDocValue(new Term("k2", "v2"), "ndv", 3L); + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + final AtomicReader r = SlowCompositeReaderWrapper.wrap(reader); + NumericDocValues ndv = r.getNumericDocValues("ndv"); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(3, ndv.get(i)); + } + reader.close(); + dir.close(); + } + + @Test + public void testManyReopensAndFields() throws Exception { + Directory dir = newDirectory(); + final Random random = random(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)); + LogMergePolicy lmp = newLogMergePolicy(); + lmp.setMergeFactor(3); // merge often + conf.setMergePolicy(lmp); + IndexWriter writer = new IndexWriter(dir, conf); + + final boolean isNRT = random.nextBoolean(); + DirectoryReader reader; + if (isNRT) { + reader = DirectoryReader.open(writer, true); + } else { + writer.commit(); + reader = DirectoryReader.open(dir); + } + + final int numFields = random.nextInt(4) + 3; // 3-7 + final long[] fieldValues = new long[numFields]; + for (int i = 0; i < fieldValues.length; i++) { + fieldValues[i] = 1; + } + + int numRounds = atLeast(15); + int docID = 0; + for (int i = 0; i < numRounds; i++) { + int numDocs = atLeast(2); + // System.out.println("round=" + i + ", numDocs=" + numDocs); + for (int j = 0; j < numDocs; j++) { + Document doc = new Document(); + doc.add(new StringField("id", "doc-" + docID, Store.NO)); + doc.add(new StringField("key", "all", Store.NO)); // update key + // add all fields with their current (updated value) + for (int f = 0; f < fieldValues.length; f++) { + doc.add(new NumericDocValuesField("f" + f, fieldValues[f])); + } + writer.addDocument(doc); + ++docID; + } + + int fieldIdx = random.nextInt(fieldValues.length); + String updateField = "f" + fieldIdx; + writer.updateNumericDocValue(new Term("key", "all"), updateField, ++fieldValues[fieldIdx]); + // System.out.println("+++ updated field '" + updateField + "' to value " + fieldValues[fieldIdx]); + + if (random.nextDouble() < 0.2) { + int deleteDoc = random.nextInt(docID); // might also delete an already deleted document, ok! + writer.deleteDocuments(new Term("id", "doc-" + deleteDoc)); + // System.out.println("--- deleted document: doc-" + deleteDoc); + } + + // verify reader + if (!isNRT) { + writer.commit(); + } + + DirectoryReader newReader = DirectoryReader.openIfChanged(reader); + assertNotNull(newReader); + reader.close(); + reader = newReader; + assertTrue(reader.numDocs() > 0); // we delete at most one document per round + for (AtomicReaderContext context : reader.leaves()) { + AtomicReader r = context.reader(); + // System.out.println(((SegmentReader) r).getSegmentName()); + Bits liveDocs = r.getLiveDocs(); + for (int field = 0; field < fieldValues.length; field++) { + String f = "f" + field; + NumericDocValues ndv = r.getNumericDocValues(f); + assertNotNull(ndv); + int maxDoc = r.maxDoc(); + for (int doc = 0; doc < maxDoc; doc++) { + if (liveDocs == null || liveDocs.get(doc)) { + // System.out.println("doc=" + (doc + context.docBase) + " f='" + f + "' vslue=" + ndv.get(doc)); + assertEquals("invalid value for doc=" + (doc + context.docBase) + ", field=" + f, fieldValues[field], ndv.get(doc)); + } + } + } + } + // System.out.println(); + } + + IOUtils.close(writer, reader, dir); + } + + @Test + public void testUpdateSegmentWithNoDocValues() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + // prevent merges, otherwise by the time updates are applied + // (writer.close()), the segments might have merged and that update becomes + // legit. + conf.setMergePolicy(NoMergePolicy.COMPOUND_FILES); + IndexWriter writer = new IndexWriter(dir, conf); + + // first segment with NDV + Document doc = new Document(); + doc.add(new StringField("id", "doc0", Store.NO)); + doc.add(new NumericDocValuesField("ndv", 5)); + writer.addDocument(doc); + writer.commit(); + + // second segment with no NDV + doc = new Document(); + doc.add(new StringField("id", "doc1", Store.NO)); + writer.addDocument(doc); + writer.commit(); + + // update document in the second segment + writer.updateNumericDocValue(new Term("id", "doc1"), "ndv", 5L); + try { + writer.close(); + fail("should not have succeeded updating a segment with no numeric DocValues field"); + } catch (UnsupportedOperationException e) { + // expected + writer.rollback(); + } + + dir.close(); + } + + @Test + public void testUpdateSegmentWithPostingButNoDocValues() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + // prevent merges, otherwise by the time updates are applied + // (writer.close()), the segments might have merged and that update becomes + // legit. + conf.setMergePolicy(NoMergePolicy.COMPOUND_FILES); + IndexWriter writer = new IndexWriter(dir, conf); + + // first segment with NDV + Document doc = new Document(); + doc.add(new StringField("id", "doc0", Store.NO)); + doc.add(new StringField("ndv", "mock-value", Store.NO)); + doc.add(new NumericDocValuesField("ndv", 5)); + writer.addDocument(doc); + writer.commit(); + + // second segment with no NDV + doc = new Document(); + doc.add(new StringField("id", "doc1", Store.NO)); + doc.add(new StringField("ndv", "mock-value", Store.NO)); + writer.addDocument(doc); + writer.commit(); + + // update documentin the second segment + writer.updateNumericDocValue(new Term("id", "doc1"), "ndv", 5L); + try { + writer.close(); + fail("should not have succeeded updating a segment with no numeric DocValues field"); + } catch (UnsupportedOperationException e) { + // expected + writer.rollback(); + } + + dir.close(); + } + + @Test + public void testUpdateNumericDVFieldWithSameNameAsPostingField() throws Exception { + // this used to fail because FieldInfos.Builder neglected to update + // globalFieldMaps.docValueTypes map + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + Document doc = new Document(); + doc.add(new StringField("f", "mock-value", Store.NO)); + doc.add(new NumericDocValuesField("f", 5)); + writer.addDocument(doc); + writer.commit(); + writer.updateNumericDocValue(new Term("f", "mock-value"), "f", 17L); + writer.close(); + + DirectoryReader r = DirectoryReader.open(dir); + NumericDocValues ndv = r.leaves().get(0).reader().getNumericDocValues("f"); + assertEquals(17, ndv.get(0)); + r.close(); + + dir.close(); + } + + @Test + public void testUpdateOldSegments() throws Exception { + Codec[] oldCodecs = new Codec[] { new Lucene40RWCodec(), new Lucene41RWCodec(), new Lucene42RWCodec() }; + Directory dir = newDirectory(); + + // create a segment with an old Codec + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setCodec(oldCodecs[random().nextInt(oldCodecs.length)]); + IndexWriter writer = new IndexWriter(dir, conf); + Document doc = new Document(); + doc.add(new StringField("id", "doc", Store.NO)); + doc.add(new NumericDocValuesField("f", 5)); + writer.addDocument(doc); + writer.close(); + + conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + writer = new IndexWriter(dir, conf); + writer.updateNumericDocValue(new Term("id", "doc"), "f", 4L); + OLD_FORMAT_IMPERSONATION_IS_ACTIVE = false; + try { + writer.close(); + fail("should not have succeeded to update a segment written with an old Codec"); + } catch (UnsupportedOperationException e) { + writer.rollback(); + } finally { + OLD_FORMAT_IMPERSONATION_IS_ACTIVE = true; + } + + dir.close(); + } + + @Test + public void testStressMultiThreading() throws Exception { + final Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(2); + final IndexWriter writer = new IndexWriter(dir, conf); + + // create index + final int numThreads = atLeast(3); + final int numDocs = atLeast(2000); + for (int i = 0; i < numDocs; i++) { + Document doc = new Document(); + double group = random().nextDouble(); + String g; + if (group < 0.1) g = "g0"; + else if (group < 0.5) g = "g1"; + else if (group < 0.8) g = "g2"; + else g = "g3"; + doc.add(new StringField("updKey", g, Store.NO)); + for (int j = 0; j < numThreads; j++) { + long value = random().nextInt(); + doc.add(new NumericDocValuesField("f" + j, value)); + doc.add(new NumericDocValuesField("cf" + j, value * 2)); // control, always updated to f * 2 + } + writer.addDocument(doc); + } + + final CountDownLatch done = new CountDownLatch(numThreads); + + // same thread updates a field as well as reopens + Thread[] threads = new Thread[numThreads]; + for (int i = 0; i < threads.length; i++) { + final String f = "f" + i; + final String cf = "cf" + i; + threads[i] = new Thread("UpdateThread-" + i) { + @Override + public void run() { + try { + int numUpdates = atLeast(40); + Random random = random(); + while (numUpdates-- > 0) { + double group = random.nextDouble(); + Term t; + if (group < 0.1) t = new Term("updKey", "g0"); + else if (group < 0.5) t = new Term("updKey", "g1"); + else if (group < 0.8) t = new Term("updKey", "g2"); + else t = new Term("updKey", "g3"); + long updValue = random.nextInt(); + writer.updateNumericDocValue(t, f, updValue); + writer.updateNumericDocValue(t, cf, updValue * 2); + + if (random.nextDouble() < 0.2) { + // delete a random document + int doc = random.nextInt(numDocs); + writer.deleteDocuments(new Term("id", "doc" + doc)); + } + + if (random.nextDouble() < 0.1) { + writer.commit(); // rarely commit + } + + if (random.nextDouble() < 0.3) { // obtain NRT reader (apply updates) + DirectoryReader.open(writer, true).close(); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + done.countDown(); + } + } + }; + } + + for (Thread t : threads) t.start(); + done.await(); + writer.close(); + + DirectoryReader reader = DirectoryReader.open(dir); + for (AtomicReaderContext context : reader.leaves()) { + AtomicReader r = context.reader(); + for (int i = 0; i < numThreads; i++) { + NumericDocValues ndv = r.getNumericDocValues("f" + i); + NumericDocValues control = r.getNumericDocValues("cf" + i); + Bits liveDocs = r.getLiveDocs(); + for (int j = 0; j < r.maxDoc(); j++) { + if (liveDocs == null || liveDocs.get(j)) { + assertEquals(control.get(j), ndv.get(j) * 2); + } + } + } + } + reader.close(); + + dir.close(); + } + + @Test + public void testUpdateDifferentDocsInDifferentGens() throws Exception { + // update same document multiple times across generations + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(4); + IndexWriter writer = new IndexWriter(dir, conf); + final int numDocs = atLeast(10); + for (int i = 0; i < numDocs; i++) { + Document doc = new Document(); + doc.add(new StringField("id", "doc" + i, Store.NO)); + long value = random().nextInt(); + doc.add(new NumericDocValuesField("f", value)); + doc.add(new NumericDocValuesField("cf", value * 2)); + writer.addDocument(doc); + } + + int numGens = atLeast(5); + for (int i = 0; i < numGens; i++) { + int doc = random().nextInt(numDocs); + Term t = new Term("id", "doc" + doc); + long value = random().nextLong(); + writer.updateNumericDocValue(t, "f", value); + writer.updateNumericDocValue(t, "cf", value * 2); + DirectoryReader reader = DirectoryReader.open(writer, true); + for (AtomicReaderContext context : reader.leaves()) { + AtomicReader r = context.reader(); + NumericDocValues fndv = r.getNumericDocValues("f"); + NumericDocValues cfndv = r.getNumericDocValues("cf"); + for (int j = 0; j < r.maxDoc(); j++) { + assertEquals(cfndv.get(j), fndv.get(j) * 2); + } + } + reader.close(); + } + writer.close(); + dir.close(); + } + + @Test + public void testChangeCodec() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setCodec(new Lucene45Codec() { + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return new Lucene45DocValuesFormat(); + } + }); + IndexWriter writer = new IndexWriter(dir, conf.clone()); + Document doc = new Document(); + doc.add(new StringField("id", "d0", Store.NO)); + doc.add(new NumericDocValuesField("f1", 5L)); + doc.add(new NumericDocValuesField("f2", 13L)); + writer.addDocument(doc); + writer.close(); + + // change format + conf.setCodec(new Lucene45Codec() { + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return new AssertingDocValuesFormat(); + } + }); + writer = new IndexWriter(dir, conf.clone()); + doc = new Document(); + doc.add(new StringField("id", "d1", Store.NO)); + doc.add(new NumericDocValuesField("f1", 17L)); + doc.add(new NumericDocValuesField("f2", 2L)); + writer.addDocument(doc); + writer.updateNumericDocValue(new Term("id", "d0"), "f1", 12L); + writer.close(); + + DirectoryReader reader = DirectoryReader.open(dir); + AtomicReader r = SlowCompositeReaderWrapper.wrap(reader); + NumericDocValues f1 = r.getNumericDocValues("f1"); + NumericDocValues f2 = r.getNumericDocValues("f2"); + assertEquals(12L, f1.get(0)); + assertEquals(13L, f2.get(0)); + assertEquals(17L, f1.get(1)); + assertEquals(2L, f2.get(1)); + reader.close(); + dir.close(); + } + +} Index: lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java (revision 1523412) +++ lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java (working copy) @@ -90,7 +90,7 @@ SegmentReader mergedReader = new SegmentReader(new SegmentInfoPerCommit( new SegmentInfo(mergedDir, Constants.LUCENE_MAIN_VERSION, mergedSegment, docsMerged, false, codec, null, null), - 0, -1L), + 0, -1L, -1L), newIOContext(random())); assertTrue(mergedReader != null); assertTrue(mergedReader.numDocs() == 2); Index: lucene/misc/src/java/org/apache/lucene/index/IndexSplitter.java =================================================================== --- lucene/misc/src/java/org/apache/lucene/index/IndexSplitter.java (revision 1523412) +++ lucene/misc/src/java/org/apache/lucene/index/IndexSplitter.java (working copy) @@ -141,7 +141,7 @@ SegmentInfo newInfo = new SegmentInfo(destFSDir, info.getVersion(), info.name, info.getDocCount(), info.getUseCompoundFile(), info.getCodec(), info.getDiagnostics(), info.attributes()); - destInfos.add(new SegmentInfoPerCommit(newInfo, infoPerCommit.getDelCount(), infoPerCommit.getDelGen())); + destInfos.add(new SegmentInfoPerCommit(newInfo, infoPerCommit.getDelCount(), infoPerCommit.getDelGen(), infoPerCommit.getDocValuesGen())); // now copy files over Collection files = infoPerCommit.files(); for (final String srcName : files) { Index: lucene/misc/src/test/org/apache/lucene/index/sorter/TestSortingMergePolicy.java =================================================================== --- lucene/misc/src/test/org/apache/lucene/index/sorter/TestSortingMergePolicy.java (revision 1523412) +++ lucene/misc/src/test/org/apache/lucene/index/sorter/TestSortingMergePolicy.java (working copy) @@ -37,7 +37,6 @@ import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.RandomIndexWriter; -import org.apache.lucene.index.SegmentReader; import org.apache.lucene.index.SlowCompositeReaderWrapper; import org.apache.lucene.index.Term; import org.apache.lucene.index.TieredMergePolicy; @@ -121,9 +120,23 @@ iw1.commit(); iw2.commit(); final Document doc = randomDocument(); - iw1.addDocument(doc); - iw2.addDocument(doc); + // NOTE: don't use RIW.addDocument directly, since it sometimes commits + // which may trigger a merge, at which case forceMerge may not do anything. + // With field updates this is a problem, since the updates can go into the + // single segment in the index, and threefore the index won't be sorted. + // This hurts the assumption of the test later on, that the index is sorted + // by SortingMP. + iw1.w.addDocument(doc); + iw2.w.addDocument(doc); + if (defaultCodecSupportsFieldUpdates()) { + // update NDV of docs belonging to one term (covers many documents) + final long value = random().nextLong(); + final String term = RandomPicks.randomFrom(random(), terms); + iw1.w.updateNumericDocValue(new Term("s", term), "ndv", value); + iw2.w.updateNumericDocValue(new Term("s", term), "ndv", value); + } + iw1.forceMerge(1); iw2.forceMerge(1); iw1.close(); @@ -144,7 +157,7 @@ private static void assertSorted(AtomicReader reader) throws IOException { final NumericDocValues ndv = reader.getNumericDocValues("ndv"); for (int i = 1; i < reader.maxDoc(); ++i) { - assertTrue(ndv.get(i-1) <= ndv.get(i)); + assertTrue("ndv(" + (i-1) + ")=" + ndv.get(i-1) + ",ndv(" + i + ")=" + ndv.get(i), ndv.get(i-1) <= ndv.get(i)); } } @@ -154,6 +167,7 @@ assertSorted(sortedReader1); assertSorted(sortedReader2); + assertReaderEquals("", sortedReader1, sortedReader2); } Index: lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java =================================================================== --- lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java (revision 1523412) +++ lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java (working copy) @@ -1382,6 +1382,15 @@ } return true; } + + /** Returns true if the codec "supports" field updates. */ + public static boolean defaultCodecSupportsFieldUpdates() { + String name = Codec.getDefault().getName(); + if (name.equals("Lucene40") || name.equals("Lucene41") || name.equals("Lucene42")) { + return false; + } + return true; + } public void assertReaderEquals(String info, IndexReader leftReader, IndexReader rightReader) throws IOException { assertReaderStatisticsEquals(info, leftReader, rightReader);