Index: lucene/src/java/org/apache/lucene/index/codecs/PostingsConsumer.java --- lucene/src/java/org/apache/lucene/index/codecs/PostingsConsumer.java Mon Jan 10 13:30:16 2011 -0500 +++ lucene/src/java/org/apache/lucene/index/codecs/PostingsConsumer.java Mon Jan 10 16:16:03 2011 -0500 @@ -30,9 +30,9 @@ public abstract class PostingsConsumer { - /** Adds a new doc in this term. Return null if this - * consumer doesn't need to see the positions for this - * doc. */ + /** Adds a new doc in this term. If this field omits term + * freqs & positions then termDocFreq should be ignored, + * and, finishDoc will not be called. */ public abstract void startDoc(int docID, int termDocFreq) throws IOException; public static class PostingsMergeState { @@ -49,7 +49,8 @@ public abstract void addPosition(int position, BytesRef payload) throws IOException; /** Called when we are done adding positions & payloads - * for each doc */ + * for each doc. Not called when the field omits term + * freq and positions. */ public abstract void finishDoc() throws IOException; /** Default merge impl: append documents, mapping around Index: lucene/src/java/org/apache/lucene/index/codecs/PrefixCodedTermsReader.java --- lucene/src/java/org/apache/lucene/index/codecs/PrefixCodedTermsReader.java Mon Jan 10 13:30:16 2011 -0500 +++ lucene/src/java/org/apache/lucene/index/codecs/PrefixCodedTermsReader.java Mon Jan 10 16:16:03 2011 -0500 @@ -356,7 +356,7 @@ if (useCache) { cachedState = termsCache.get(fieldTerm); if (cachedState != null) { - state.copy(cachedState); + state.copyFrom(cachedState); seekPending = true; positioned = false; bytesReader.term.copy(term); Index: lucene/src/java/org/apache/lucene/index/codecs/TermState.java --- lucene/src/java/org/apache/lucene/index/codecs/TermState.java Mon Jan 10 13:30:16 2011 -0500 +++ lucene/src/java/org/apache/lucene/index/codecs/TermState.java Mon Jan 10 16:16:03 2011 -0500 @@ -33,7 +33,7 @@ public long filePointer; // fp into the terms dict primary file (_X.tis) public int docFreq; // how many docs have this term - public void copy(TermState other) { + public void copyFrom(TermState other) { ord = other.ord; filePointer = other.filePointer; docFreq = other.docFreq; Index: lucene/src/java/org/apache/lucene/index/codecs/pulsing/PulsingPostingsReaderImpl.java --- lucene/src/java/org/apache/lucene/index/codecs/pulsing/PulsingPostingsReaderImpl.java Mon Jan 10 13:30:16 2011 -0500 +++ lucene/src/java/org/apache/lucene/index/codecs/pulsing/PulsingPostingsReaderImpl.java Mon Jan 10 16:16:03 2011 -0500 @@ -19,14 +19,14 @@ import java.io.IOException; +import org.apache.lucene.index.DocsAndPositionsEnum; import org.apache.lucene.index.DocsEnum; import org.apache.lucene.index.FieldInfo; -import org.apache.lucene.index.DocsAndPositionsEnum; +import org.apache.lucene.index.codecs.PostingsReaderBase; import org.apache.lucene.index.codecs.TermState; -import org.apache.lucene.index.codecs.PostingsReaderBase; -import org.apache.lucene.index.codecs.pulsing.PulsingPostingsWriterImpl.Document; -import org.apache.lucene.index.codecs.pulsing.PulsingPostingsWriterImpl.Position; +import org.apache.lucene.store.ByteArrayDataInput; import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.CodecUtil; @@ -43,7 +43,7 @@ // Fallback reader for non-pulsed terms: final PostingsReaderBase wrappedPostingsReader; - int maxPulsingDocFreq; + int maxPositions; public PulsingPostingsReaderImpl(PostingsReaderBase wrappedPostingsReader) throws IOException { this.wrappedPostingsReader = wrappedPostingsReader; @@ -53,38 +53,50 @@ public void init(IndexInput termsIn) throws IOException { CodecUtil.checkHeader(termsIn, PulsingPostingsWriterImpl.CODEC, PulsingPostingsWriterImpl.VERSION_START, PulsingPostingsWriterImpl.VERSION_START); - maxPulsingDocFreq = termsIn.readVInt(); wrappedPostingsReader.init(termsIn); } private static class PulsingTermState extends TermState { - private Document docs[]; + private byte[] postings; + private int postingsSize; // -1 if this term was not inlined private TermState wrappedTermState; private boolean pendingIndexTerm; + @Override public Object clone() { PulsingTermState clone; clone = (PulsingTermState) super.clone(); - clone.docs = docs.clone(); - for(int i=0;i>> 1; // shift off low bit + if ((code & 1) != 0) { // if low bit is set + freq = 1; // freq is one + } else { + freq = postings.readVInt(); // else read freq } + + // Skip positions + if (storePayloads) { + int payloadLength = -1; + for(int pos=0;pos>> 1; // shift off low bit + if ((code & 1) != 0) { // if low bit is set + freq = 1; // freq is one } else { - doc = state.docs[nextRead++]; - if (skipDocs == null || !skipDocs.get(doc.docID)) { - nextPosRead = 0; - return doc.docID; - } + freq = postings.readVInt(); // else read freq + } + posPending = freq; + + if (skipDocs == null || !skipDocs.get(docID)) { + //System.out.println(" return docID=" + docID + " freq=" + freq); + position = 0; + return docID; } } } @Override public int freq() { - return doc.numPositions; + return freq; } @Override public int docID() { - return doc.docID; + return docID; } @Override @@ -351,22 +375,68 @@ } @Override - public int nextPosition() { - assert nextPosRead < doc.numPositions; - pos = doc.positions[nextPosRead++]; - payloadRetrieved = false; - return pos.pos; + public int nextPosition() throws IOException { + //System.out.println("PR d&p nextPosition posPending=" + posPending + " vs freq=" + freq); + + assert posPending > 0; + posPending--; + + if (storePayloads) { + if (!payloadRetrieved) { + //System.out.println("PR skip payload=" + payloadLength); + postings.skipBytes(payloadLength); + } + final int code = postings.readVInt(); + //System.out.println("PR code=" + code); + if ((code & 1) != 0) { + payloadLength = postings.readVInt(); + //System.out.println("PR new payload len=" + payloadLength); + } + position += code >> 1; + payloadRetrieved = false; + } else { + position += postings.readVInt(); + } + + //System.out.println("PR d&p nextPos return pos=" + position + " this=" + this); + return position; + } + + private void skipPositions() throws IOException { + while(posPending != 0) { + nextPosition(); + } + if (storePayloads && !payloadRetrieved) { + //System.out.println(" skip payload len=" + payloadLength); + postings.skipBytes(payloadLength); + payloadRetrieved = true; + } } @Override public boolean hasPayload() { - return !payloadRetrieved && pos.payload != null && pos.payload.length > 0; + return storePayloads && !payloadRetrieved && payloadLength > 0; } @Override - public BytesRef getPayload() { + public BytesRef getPayload() throws IOException { + //System.out.println("PR getPayload payloadLength=" + payloadLength + " this=" + this); + if (payloadRetrieved) { + throw new IOException("Either no payload exists at this term position or an attempt was made to load it more than once."); + } payloadRetrieved = true; - return pos.payload; + if (payloadLength > 0) { + if (payload == null) { + payload = new BytesRef(payloadLength); + } else { + payload.grow(payloadLength); + } + postings.readBytes(payload.bytes, 0, payloadLength); + payload.length = payloadLength; + return payload; + } else { + return null; + } } } Index: lucene/src/java/org/apache/lucene/index/codecs/pulsing/PulsingPostingsWriterImpl.java --- lucene/src/java/org/apache/lucene/index/codecs/pulsing/PulsingPostingsWriterImpl.java Mon Jan 10 13:30:16 2011 -0500 +++ lucene/src/java/org/apache/lucene/index/codecs/pulsing/PulsingPostingsWriterImpl.java Mon Jan 10 16:16:03 2011 -0500 @@ -20,12 +20,11 @@ import java.io.IOException; import org.apache.lucene.index.FieldInfo; -import org.apache.lucene.util.CodecUtil; import org.apache.lucene.index.codecs.PostingsWriterBase; import org.apache.lucene.store.IndexOutput; -import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.store.RAMOutputStream; import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.CodecUtil; // TODO: we now pulse entirely according to docFreq of the // term; it might be better to eg pulse by "net bytes used" @@ -44,67 +43,21 @@ final static int VERSION_CURRENT = VERSION_START; - IndexOutput termsOut; + private IndexOutput termsOut; - boolean omitTF; - boolean storePayloads; + private boolean omitTF; + private boolean storePayloads; - // Starts a new term - FieldInfo fieldInfo; + // one entry per position + private final Position[] pending; + private int pendingCount = 0; // -1 once we've hit too many positions + private Position currentDoc; // first Position entry of current doc - /** @lucene.experimental */ - public static class Document { + private static final class Position { + BytesRef payload; + int termFreq; // only incremented on first position for a given doc + int pos; int docID; - int termDocFreq; - int numPositions; - Position[] positions; - Document() { - positions = new Position[1]; - positions[0] = new Position(); - } - - @Override - public Object clone() { - Document doc = new Document(); - doc.docID = docID; - doc.termDocFreq = termDocFreq; - doc.numPositions = numPositions; - doc.positions = new Position[positions.length]; - for(int i = 0; i < positions.length; i++) { - doc.positions[i] = (Position) positions[i].clone(); - } - - return doc; - } - - void reallocPositions(int minSize) { - final Position[] newArray = new Position[ArrayUtil.oversize(minSize, RamUsageEstimator.NUM_BYTES_OBJECT_REF)]; - System.arraycopy(positions, 0, newArray, 0, positions.length); - for(int i=positions.length;i maxPulsingDocFreq docs - - static class Position { - BytesRef payload; - int pos; - - @Override - public Object clone() { - Position position = new Position(); - position.pos = pos; - if (payload != null) { - position.payload = new BytesRef(payload); - } - return position; - } } // TODO: -- lazy init this? ie, if every single term @@ -112,18 +65,19 @@ // Fallback writer for non-pulsed terms: final PostingsWriterBase wrappedPostingsWriter; - /** If docFreq <= maxPulsingDocFreq, its postings are + /** If the total number of positions (summed across all docs + * for this term) is <= maxPositions, then the postings are * inlined into terms dict */ - public PulsingPostingsWriterImpl(int maxPulsingDocFreq, PostingsWriterBase wrappedPostingsWriter) throws IOException { + public PulsingPostingsWriterImpl(int maxPositions, PostingsWriterBase wrappedPostingsWriter) throws IOException { super(); - pendingDocs = new Document[maxPulsingDocFreq]; - for(int i=0;i= the cutoff: this.wrappedPostingsWriter = wrappedPostingsWriter; } @@ -131,14 +85,13 @@ public void start(IndexOutput termsOut) throws IOException { this.termsOut = termsOut; CodecUtil.writeHeader(termsOut, CODEC, VERSION_CURRENT); - termsOut.writeVInt(pendingDocs.length); wrappedPostingsWriter.start(termsOut); } @Override public void startTerm() { - assert pendingDocCount == 0; - pulsed = false; + //System.out.println("PW startTerm"); + assert pendingCount == 0; } // TODO: -- should we NOT reuse across fields? would @@ -148,7 +101,7 @@ // our parent calls setField whenever the field changes @Override public void setField(FieldInfo fieldInfo) { - this.fieldInfo = fieldInfo; + //System.out.println("PW field=" + fieldInfo.name); omitTF = fieldInfo.omitTermFreqAndPositions; storePayloads = fieldInfo.storePayloads; wrappedPostingsWriter.setField(fieldInfo); @@ -156,65 +109,46 @@ @Override public void startDoc(int docID, int termDocFreq) throws IOException { + assert docID >= 0: "got docID=" + docID; + //System.out.println("PW doc=" + docID); - assert docID >= 0: "got docID=" + docID; - - if (!pulsed && pendingDocCount == pendingDocs.length) { - - // OK we just crossed the threshold, this term should - // now be written with our wrapped codec: - wrappedPostingsWriter.startTerm(); - - // Flush all buffered docs - for(int i=0;i 0) { - assert storePayloads; - wrappedPostingsWriter.addPosition(pos.pos, pos.payload); - } else { - wrappedPostingsWriter.addPosition(pos.pos, null); - } - } - wrappedPostingsWriter.finishDoc(); - } - } - - pendingDocCount = 0; - - pulsed = true; + if (pendingCount == pending.length) { + push(); } - if (pulsed) { + if (pendingCount != -1) { + assert pendingCount < pending.length; + currentDoc = pending[pendingCount]; + currentDoc.docID = docID; + if (omitTF) { + pendingCount++; + } else { + currentDoc.termFreq = termDocFreq; + } + } else { // We've already seen too many docs for this term -- // just forward to our fallback writer wrappedPostingsWriter.startDoc(docID, termDocFreq); - } else { - currentDoc = pendingDocs[pendingDocCount++]; - currentDoc.docID = docID; - // TODO: -- need not store in doc? only used for alloc & assert - currentDoc.termDocFreq = termDocFreq; - if (termDocFreq > currentDoc.positions.length) { - currentDoc.reallocPositions(termDocFreq); - } - currentDoc.numPositions = 0; } } @Override public void addPosition(int position, BytesRef payload) throws IOException { - if (pulsed) { + + //System.out.println("PW pos=" + position + " payload=" + (payload == null ? "null" : payload.length + " bytes")); + if (pendingCount == pending.length) { + push(); + } + + if (pendingCount == -1) { + // We've already seen too many docs for this term -- + // just forward to our fallback writer wrappedPostingsWriter.addPosition(position, payload); } else { - // just buffer up - Position pos = currentDoc.positions[currentDoc.numPositions++]; + // buffer up + final Position pos = pending[pendingCount++]; pos.pos = position; + pos.docID = currentDoc.docID; if (payload != null && payload.length > 0) { if (pos.payload == null) { pos.payload = new BytesRef(payload); @@ -229,86 +163,137 @@ @Override public void finishDoc() throws IOException { - assert omitTF || currentDoc.numPositions == currentDoc.termDocFreq; - if (pulsed) { + //System.out.println("PW finishDoc"); + if (pendingCount == -1) { wrappedPostingsWriter.finishDoc(); } } - boolean pendingIsIndexTerm; + private boolean pendingIsIndexTerm; - int pulsedCount; - int nonPulsedCount; + private final RAMOutputStream buffer = new RAMOutputStream(); /** Called when we are done adding docs to this term */ @Override public void finishTerm(int docCount, boolean isIndexTerm) throws IOException { + //System.out.println("PW finishTerm docCount=" + docCount); - assert docCount > 0; + assert pendingCount > 0 || pendingCount == -1; pendingIsIndexTerm |= isIndexTerm; - if (pulsed) { + if (pendingCount == -1) { + termsOut.writeByte((byte) 0); wrappedPostingsWriter.finishTerm(docCount, pendingIsIndexTerm); pendingIsIndexTerm = false; - pulsedCount++; } else { - nonPulsedCount++; - // OK, there were few enough occurrences for this + + // There were few enough total occurrences for this // term, so we fully inline our postings data into // terms dict, now: - int lastDocID = 0; - for(int i=0;i doc.docID; + wrappedPostingsWriter.finishDoc(); + doc = pos; + wrappedPostingsWriter.startDoc(doc.docID, doc.termFreq); + } + wrappedPostingsWriter.addPosition(pos.pos, pos.payload); + } + wrappedPostingsWriter.finishDoc(); + } else { + for(Position doc : pending) { + wrappedPostingsWriter.startDoc(doc.docID, 0); + } + } + pendingCount = -1; + } } Index: lucene/src/java/org/apache/lucene/index/codecs/sep/SepPostingsReaderImpl.java --- lucene/src/java/org/apache/lucene/index/codecs/sep/SepPostingsReaderImpl.java Mon Jan 10 13:30:16 2011 -0500 +++ lucene/src/java/org/apache/lucene/index/codecs/sep/SepPostingsReaderImpl.java Mon Jan 10 16:16:03 2011 -0500 @@ -141,8 +141,8 @@ return other; } - public void copy(TermState _other) { - super.copy(_other); + public void copyFrom(TermState _other) { + super.copyFrom(_other); SepTermState other = (SepTermState) _other; docIndex.set(other.docIndex); } Index: lucene/src/java/org/apache/lucene/index/codecs/standard/StandardPostingsReader.java --- lucene/src/java/org/apache/lucene/index/codecs/standard/StandardPostingsReader.java Mon Jan 10 13:30:16 2011 -0500 +++ lucene/src/java/org/apache/lucene/index/codecs/standard/StandardPostingsReader.java Mon Jan 10 16:16:03 2011 -0500 @@ -90,12 +90,12 @@ public Object clone() { DocTermState other = new DocTermState(); - other.copy(this); + other.copyFrom(this); return other; } - public void copy(TermState _other) { - super.copy(_other); + public void copyFrom(TermState _other) { + super.copyFrom(_other); DocTermState other = (DocTermState) _other; freqOffset = other.freqOffset; proxOffset = other.proxOffset; @@ -785,6 +785,7 @@ if (payloadLength > payload.bytes.length) { payload.grow(payloadLength); } + proxIn.readBytes(payload.bytes, 0, payloadLength); payload.length = payloadLength; payloadPending = false; Index: lucene/src/java/org/apache/lucene/store/ByteArrayDataInput.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ lucene/src/java/org/apache/lucene/store/ByteArrayDataInput.java Mon Jan 10 16:16:03 2011 -0500 @@ -0,0 +1,56 @@ +package org.apache.lucene.store; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @lucene.experimental */ +public final class ByteArrayDataInput extends DataInput { + + private byte[] bytes; + private int pos; + + // TODO: allow BytesRef (slice) too + public ByteArrayDataInput(byte[] bytes) { + this.bytes = bytes; + } + + public void reset(byte[] bytes) { + this.bytes = bytes; + pos = 0; + } + + public boolean eof() { + return pos == bytes.length; + } + + public void skipBytes(int count) { + pos += count; + } + + // NOTE: AIOOBE not EOF if you read too much + @Override + public byte readByte() { + return bytes[pos++]; + } + + // NOTE: AIOOBE not EOF if you read too much + @Override + public void readBytes(byte[] b, int offset, int len) { + System.arraycopy(bytes, pos, b, offset, len); + pos += len; + } +} Index: lucene/src/test/org/apache/lucene/index/TestIndexReader.java --- lucene/src/test/org/apache/lucene/index/TestIndexReader.java Mon Jan 10 13:30:16 2011 -0500 +++ lucene/src/test/org/apache/lucene/index/TestIndexReader.java Mon Jan 10 16:16:03 2011 -0500 @@ -329,6 +329,7 @@ // add 100 documents with term : aaa writer = new IndexWriter(dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer())); + writer.setInfoStream(VERBOSE ? System.out : null); for (int i = 0; i < 100; i++) { addDoc(writer, searchTerm.text()); } Index: lucene/src/test/org/apache/lucene/index/TestNRTThreads.java --- lucene/src/test/org/apache/lucene/index/TestNRTThreads.java Mon Jan 10 13:30:16 2011 -0500 +++ lucene/src/test/org/apache/lucene/index/TestNRTThreads.java Mon Jan 10 16:16:03 2011 -0500 @@ -349,6 +349,7 @@ assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), writer.numDocs()); writer.close(false); + _TestUtil.checkIndex(dir); dir.close(); _TestUtil.rmDir(tempDir); docs.close();